Coverage for aixcalibuha/sensitivity_analysis/sensitivity_analyzer.py: 94%
407 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-01-27 10:48 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2024-01-27 10:48 +0000
1"""Package containing modules for sensitivity analysis.
2The module contains the relevant base-classes."""
3import abc
4import copy
5import os
6import pathlib
7import multiprocessing as mp
8from typing import List
9from collections import Counter
10import numpy as np
11import pandas as pd
12from ebcpy.utils import setup_logger
13from ebcpy.utils.reproduction import CopyFile
14from ebcpy.simulationapi import SimulationAPI
15from aixcalibuha import CalibrationClass, data_types
16from aixcalibuha import utils
17from aixcalibuha.sensitivity_analysis.plotting import plot_single, plot_time_dependent
20def _load_single_file(_filepath, parquet_engine='pyarrow'):
21 """Helper function"""
22 if _filepath is None:
23 return None
24 return data_types.TimeSeriesData(_filepath, default_tag='sim', key='simulation',
25 engine=parquet_engine)
28def _load_files(_filepaths, parquet_engine='pyarrow'):
29 """Helper function"""
30 results = []
31 for _filepath in _filepaths:
32 results.append(_load_single_file(_filepath, parquet_engine=parquet_engine))
33 return results
36def _restruct_verbose(list_output_verbose):
37 """Helper function"""
38 output_verbose = {}
39 for key, val in list_output_verbose[0].items():
40 output_verbose[key] = np.array([])
41 for i in list_output_verbose:
42 for key, val in i.items():
43 output_verbose[key] = np.append(output_verbose[key], np.array([val[1]]))
44 return output_verbose
47def _concat_all_sims(sim_results_list):
48 """Helper function that concat all results in a list to one DataFrame."""
49 sim_results_list = [r.to_df() for r in sim_results_list]
50 sim_results_list = pd.concat(sim_results_list, keys=range(len(sim_results_list)),
51 axis='columns')
52 sim_results_list = sim_results_list.swaplevel(axis=1).sort_index(axis=1)
53 return sim_results_list
56def _restruct_time_dependent(sen_time_dependent_list, time_index):
57 """Helper function that restructures the time dependent sensitivity results."""
59 def _restruct_single(sen_time_dependent_list_s, second_order=False):
60 sen_time_dependent_df = pd.concat(sen_time_dependent_list_s, keys=time_index, axis=0)
61 sen_time_dependent_df = sen_time_dependent_df.droplevel('Class', axis='index')
62 sen_time_dependent_df = sen_time_dependent_df.swaplevel(0, 1)
63 sen_time_dependent_df = sen_time_dependent_df.swaplevel(1, 2).sort_index(axis=0)
64 if second_order:
65 sen_time_dependent_df = sen_time_dependent_df.swaplevel(2, 3).sort_index(axis=0)
66 sen_time_dependent_df.index.set_names(
67 ['Goal', 'Analysis variable', 'Interaction', 'time'], inplace=True)
68 else:
69 sen_time_dependent_df.index.set_names(['Goal', 'Analysis variable', 'time'],
70 inplace=True)
71 return sen_time_dependent_df
73 if isinstance(sen_time_dependent_list[0], tuple):
74 sen_time_dependent_list1, sen_time_dependent_list2 = zip(*sen_time_dependent_list)
75 return _restruct_single(sen_time_dependent_list1), _restruct_single(
76 sen_time_dependent_list2, True)
77 return _restruct_single(sen_time_dependent_list)
80def _divide_chunks(long_list, chunk_length):
81 """Helper function that divides all list into multiple list with a specific chunk length."""
82 for i in range(0, len(long_list), chunk_length):
83 yield long_list[i:i + chunk_length]
86class SenAnalyzer(abc.ABC):
87 """
88 Class to perform a Sensitivity Analysis.
90 :param SimulationAPI sim_api:
91 Simulation-API used to simulate the samples
92 :param int num_samples:
93 The parameter `N` to the sampler methods of sobol and morris. NOTE: This is not the
94 number of samples produced, but relates to the total number of samples produced in
95 a manner dependent on the sampler method used. See the documentation of the specific
96 method in the SALib for more information.
97 :keyword str,os.path.normpath cd:
98 The path for the current working directory.
99 Logger and results will be stored here.
100 :keyword boolean fail_on_error:
101 Default is False. If True, the calibration will stop with an error if
102 the simulation fails. See also: ``ret_val_on_error``
103 :keyword float,np.NAN ret_val_on_error:
104 Default is np.NAN. If ``fail_on_error`` is false, you can specify here
105 which value to return in the case of a failed simulation. Possible
106 options are np.NaN, np.inf or some other high numbers. be aware that this
107 max influence the solver.
108 :keyword boolean save_files:
109 Default False. If true, all simulation files for each iteration will be saved!
110 :keyword str suffix_files:
111 Default 'csv'. Specifies the data format to store the simulation files in.
112 Options are 'csv', 'hdf', 'parquet'.
113 :keyword str parquet_engine:
114 The engine to use for the data format parquet.
115 Supported options can be extracted
116 from the ebcpy.TimeSeriesData.save() function.
117 Default is 'pyarrow'.
118 :keyword str,os.path.normpath savepath_sim:
119 Default is cd. Own directory for the time series data sets of all simulations
120 during the sensitivity analysis. The own dir can be necessary for large data sets,
121 because they can crash IDE during indexing when they are in the project folder.
123 """
125 def __init__(self,
126 sim_api: SimulationAPI,
127 num_samples: int,
128 **kwargs):
129 """Instantiate class parameters"""
130 # Setup the instance attributes
131 self.sim_api = sim_api
132 self.num_samples = num_samples
134 # Update kwargs
135 self.fail_on_error = kwargs.pop("fail_on_error", True)
136 self.save_files = kwargs.pop("save_files", False)
137 self.suffix_files = kwargs.pop('suffix_files', 'csv')
138 self.parquet_engine = kwargs.pop('parquet_engine', 'pyarrow')
139 self.ret_val_on_error = kwargs.pop("ret_val_on_error", np.NAN)
140 self.cd = kwargs.pop("cd", os.getcwd())
141 self.savepath_sim = kwargs.pop('savepath_sim', self.cd)
143 if isinstance(self.cd, str):
144 self.cd = pathlib.Path(self.cd)
145 if isinstance(self.savepath_sim, str):
146 self.savepath_sim = pathlib.Path(self.savepath_sim)
148 # Setup the logger
149 self.logger = setup_logger(cd=self.cd, name=self.__class__.__name__)
151 # Setup default values
152 self.problem: dict = None
153 self.reproduction_files = []
155 @property
156 @abc.abstractmethod
157 def analysis_variables(self) -> List[str]:
158 """
159 Indicate which variables are
160 able to be selected for analysis
162 :return:
163 A list of strings
164 :rtype: List[str]
165 """
166 raise NotImplementedError(f'{self.__class__.__name__}.analysis_variables '
167 f'property is not defined yet')
169 @abc.abstractmethod
170 def analysis_function(self, x, y):
171 """
172 Use the method to analyze the simulation results.
174 :param np.array x:
175 the `X` parameter of the method (The NumPy matrix containing the model inputs)
176 :param np.array y:
177 The NumPy array containing the model outputs
178 """
179 raise NotImplementedError(f'{self.__class__.__name__}.analysis_function '
180 f'function is not defined yet')
182 @abc.abstractmethod
183 def create_sampler_demand(self) -> dict:
184 """
185 Return the sampler parameters
187 :return:
188 dict: A dict with the sampler demand
189 """
190 raise NotImplementedError(f'{self.__class__.__name__}.analysis_function '
191 f'function is not defined yet')
193 @abc.abstractmethod
194 def generate_samples(self):
195 """
196 Run the sampler specified by `method` and return the results.
198 :return:
199 The list of samples generated as a NumPy array with one row per sample
200 and each row containing one value for each variable name in `problem['names']`.
201 :rtype: np.ndarray
202 """
203 raise NotImplementedError(f'{self.__class__.__name__}.generate_samples '
204 f'function is not defined yet')
206 def simulate_samples(self, cal_class, **kwargs):
207 """
208 Creates the samples for the calibration class and simulates them.
210 :param cal_class:
211 One class for calibration. Goals and tuner_paras have to be set
212 :keyword scale:
213 Default is False. If True the bounds of the tuner-parameters
214 will be scaled between 0 and 1.
216 :return:
217 Returns two lists. First a list with the simulation results for each sample.
218 If save_files the list contains the filepaths to the results
219 Second a list of the samples.
220 :rtype: list
221 """
222 scale = kwargs.pop('scale', False)
223 # Set the output interval according the given Goals
224 mean_freq = cal_class.goals.get_meas_frequency()
225 self.logger.info("Setting output_interval of simulation according "
226 "to measurement target data frequency: %s", mean_freq)
227 self.sim_api.sim_setup.output_interval = mean_freq
228 initial_names = cal_class.tuner_paras.get_names()
229 self.sim_api.set_sim_setup({"start_time": cal_class.start_time,
230 "stop_time": cal_class.stop_time})
231 self.sim_api.result_names = cal_class.goals.get_sim_var_names()
233 self.problem = self.create_problem(cal_class.tuner_paras, scale=scale)
234 samples = self.generate_samples()
236 # creat df of samples with the result_file_names as the index
237 result_file_names = [f"simulation_{idx}" for idx in range(len(samples))]
238 samples_df = pd.DataFrame(samples, columns=initial_names, index=result_file_names)
239 samples_df.to_csv(self.cd.joinpath(f'samples_{cal_class.name}.csv'))
241 # Simulate the current values
242 parameters = []
243 for initial_values in samples:
244 if scale:
245 initial_values = cal_class.tuner_paras.descale(initial_values)
246 parameters.append(dict(zip(initial_names, initial_values)))
248 self.logger.info('Starting %s parameter variations on %s cores',
249 len(samples), self.sim_api.n_cpu)
250 if self.save_files:
251 sim_dir = self.savepath_sim.joinpath(f'simulations_{cal_class.name}')
252 os.makedirs(sim_dir, exist_ok=True)
253 samples_df.to_csv(self.savepath_sim.joinpath(f'samples_{cal_class.name}.csv'))
254 self.logger.info(f'Saving simulation files in: {sim_dir}')
255 _filepaths = self.sim_api.simulate(
256 parameters=parameters,
257 return_option="savepath",
258 savepath=sim_dir,
259 result_file_name=result_file_names,
260 result_file_suffix=self.suffix_files,
261 parquet_engine=self.parquet_engine,
262 fail_on_error=self.fail_on_error,
263 inputs=cal_class.inputs,
264 **cal_class.input_kwargs
265 )
266 self.reproduction_files.extend(_filepaths)
267 results = _filepaths
268 else:
269 results = self.sim_api.simulate(
270 parameters=parameters,
271 inputs=cal_class.inputs,
272 fail_on_error=self.fail_on_error,
273 **cal_class.input_kwargs
274 )
275 self.logger.info('Finished %s simulations', len(samples))
276 return results, samples
278 def _check_index(self, tsd: data_types.TimeSeriesData, sim_num=None):
279 freq = tsd.frequency
280 if sim_num is None:
281 sim_num = tsd.filepath.name
282 if freq[0] != self.sim_api.sim_setup.output_interval:
283 self.logger.info(
284 f'The mean value of the frequency from {sim_num} does not match output '
285 'interval index will be cleaned and spaced equally')
286 tsd.to_datetime_index()
287 tsd.clean_and_space_equally(f'{str(self.sim_api.sim_setup.output_interval * 1000)}ms')
288 tsd.to_float_index()
289 freq = tsd.frequency
290 if freq[1] > 0.0:
291 self.logger.info(f'The standard deviation of the frequency from {sim_num} is to high '
292 f'and will be rounded to the accuracy of the output interval')
293 tsd.index = np.round(tsd.index.astype("float64"),
294 str(self.sim_api.sim_setup.output_interval)[::-1].find('.'))
295 return tsd
297 def _single_eval_statistical_measure(self, kwargs_eval):
298 """Evaluates statistical measure of one result"""
299 cal_class = kwargs_eval.pop('cal_class')
300 result = kwargs_eval.pop('result')
301 num_sim = kwargs_eval.pop('sim_num', None)
302 if result is None:
303 verbose_error = {}
304 for goal, weight in zip(cal_class.goals.get_goals_list(), cal_class.goals.weightings):
305 verbose_error[goal] = (weight, self.ret_val_on_error)
306 return self.ret_val_on_error, verbose_error
307 result = self._check_index(result, num_sim)
308 cal_class.goals.set_sim_target_data(result)
309 cal_class.goals.set_relevant_time_intervals(cal_class.relevant_intervals)
310 # Evaluate the current objective
311 total_res, verbose_calculation = cal_class.goals.eval_difference(verbose=True)
312 return total_res, verbose_calculation
314 def eval_statistical_measure(self, cal_class, results, verbose=True):
315 """Evaluates statistical measures of results on single core"""
316 self.logger.info('Starting evaluation of statistical measure')
317 output = []
318 list_output_verbose = []
319 for i, result in enumerate(results):
320 total_res, verbose_calculation = self._single_eval_statistical_measure(
321 {'cal_class': cal_class, 'result': result, 'sim_num': f'simulation_{i}'}
322 )
323 output.append(total_res)
324 list_output_verbose.append(verbose_calculation)
325 if verbose:
326 # restructure output_verbose
327 output_verbose = _restruct_verbose(list_output_verbose)
328 return np.asarray(output), output_verbose
329 return np.asarray(output)
331 def _single_load_eval_file(self, kwargs_load_eval):
332 """For multiprocessing"""
333 filepath = kwargs_load_eval.pop('filepath')
334 _result = _load_single_file(filepath, self.parquet_engine)
335 kwargs_load_eval.update({'result': _result})
336 total_res, verbose_calculation = self._single_eval_statistical_measure(kwargs_load_eval)
337 return total_res, verbose_calculation
339 def _mp_load_eval(self, _filepaths, cal_class, n_cpu):
340 """
341 Loading and evaluating the statistical measure of saved simulation files on multiple cores
342 """
343 self.logger.info(f'Load files and evaluate statistical measure on {n_cpu} processes.')
344 kwargs_load_eval = []
345 for filepath in _filepaths:
346 kwargs_load_eval.append({'filepath': filepath, 'cal_class': cal_class})
347 output_array = []
348 list_output_verbose = []
349 with mp.Pool(processes=n_cpu) as pool:
350 for total, verbose in pool.imap(self._single_load_eval_file, kwargs_load_eval):
351 output_array.append(total)
352 list_output_verbose.append(verbose)
353 output_array = np.asarray(output_array)
354 output_verbose = _restruct_verbose(list_output_verbose)
355 return output_array, output_verbose
357 def _load_eval(self, _filepaths, cal_class, n_cpu):
358 """
359 Loading and evaluating the statistical measure of saved simulation files.
360 Single- or multiprocessing possible with definition of n_cpu.
361 """
362 if n_cpu == 1:
363 results = _load_files(_filepaths, self.parquet_engine)
364 output_array, output_verbose = self.eval_statistical_measure(
365 cal_class=cal_class,
366 results=results
367 )
368 return output_array, output_verbose
369 output_array, output_verbose = self._mp_load_eval(_filepaths, cal_class, n_cpu)
370 return output_array, output_verbose
372 def run(self, calibration_classes, merge_multiple_classes=True, **kwargs):
373 """
374 Execute the sensitivity analysis for each class and
375 return the result.
377 :param CalibrationClass,list calibration_classes:
378 Either one or multiple classes for calibration with same tuner-parameters.
379 :param bool merge_multiple_classes:
380 Default True. If False, the given list of calibration-classes
381 is handled as-is. This means if you pass two CalibrationClass objects
382 with the same name (e.g. "device on"), the calibration process will run
383 for both these classes stand-alone.
384 This will automatically yield an intersection of tuner-parameters, however may
385 have advantages in some cases.
386 :keyword bool verbose:
387 Default False. If True, in addition to the combined Goals of the Classes
388 (saved under index Goal: all), the sensitivity measures of the individual
389 Goals will also be calculated and returned.
390 :keyword scale:
391 Default is False. If True the bounds of the tuner-parameters
392 will be scaled between 0 and 1.
393 :keyword bool use_fist_sim:
394 Default False. If True, the simulations of the first calibration class will be used for
395 all other calibration classes with their relevant time intervals.
396 The simulations must be stored on a hard-drive, so it must be used with
397 either save_files or load_files.
398 :keyword int n_cpu:
399 Default is 1. The number of processes to use for the evaluation of the statistical
400 measure. For n_cpu > 1 only one simulation file is loaded at once in a process and
401 dumped directly after the evaluation of the statistical measure,
402 so that only minimal memory is used.
403 Use this option for large analyses.
404 Only implemented for save_files=True or load_sim_files=True.
405 :keyword bool load_sim_files:
406 Default False. If True, no new simulations are done and old simulations are loaded.
407 The simulations and corresponding samples will be loaded from self.savepath_sim like
408 they were saved from self.save_files. Currently, the name of the sim folder must be
409 "simulations_CAL_CLASS_NAME" and for the samples "samples_CAL_CLASS_NAME".
410 The usage of the same simulations for different
411 calibration classes is not supported yet.
412 :keyword bool save_results:
413 Default True. If True, all results are saved as a csv in cd.
414 (samples, statistical measures and analysis variables).
415 :keyword bool plot_result:
416 Default True. If True, the results will be plotted.
417 :return:
418 Returns a pandas.DataFrame. The DataFrame has a Multiindex with the
419 levels Class, Goal and Analysis variable. The Goal name of combined goals is 'all'.
420 The variables are the tuner-parameters.
421 For the Sobol Method and calc_second_order returns a tuple of DataFrames (df_1, df_2)
422 where df_2 contains the second oder analysis variables and has an extra index level
423 Interaction, which also contains the variables.
424 :rtype: pandas.DataFrame
425 """
426 verbose = kwargs.pop('verbose', False)
427 scale = kwargs.pop('scale', False)
428 use_first_sim = kwargs.pop('use_first_sim', False)
429 n_cpu = kwargs.pop('n_cpu', 1)
430 save_results = kwargs.pop('save_results', True)
431 plot_result = kwargs.pop('plot_result', True)
432 load_sim_files = kwargs.pop('load_sim_files', False)
433 # Check correct input
434 calibration_classes = utils.validate_cal_class_input(calibration_classes)
435 # Merge the classes for avoiding possible intersection of tuner-parameters
436 if merge_multiple_classes:
437 calibration_classes = data_types.merge_calibration_classes(calibration_classes)
439 # Check n_cpu
440 if n_cpu > mp.cpu_count():
441 raise ValueError(f"Given n_cpu '{n_cpu}' is greater "
442 "than the available number of "
443 f"cpus on your machine '{mp.cpu_count()}'")
445 # Check if the usage of the simulations from the first calibration class for all is possible
446 if use_first_sim:
447 if not self.save_files and not load_sim_files:
448 raise AttributeError('To use the simulations of the first calibration class '
449 'for all classes the simulation files must be saved. '
450 'Either set save_files=True or load already exiting files '
451 'with load_sim_files=True.')
452 start_time = 0
453 stop_time = 0
454 for idx, cal_class in enumerate(calibration_classes):
455 if idx == 0:
456 start_time = cal_class.start_time
457 stop_time = cal_class.stop_time
458 continue
459 if start_time > cal_class.start_time or stop_time < cal_class.stop_time:
460 raise ValueError(f'To use the simulations of the first calibration class '
461 f'for all classes the start and stop times of the other '
462 f'classes must be in the interval [{start_time}, {stop_time}] '
463 f'of the first calibration class.')
465 all_results = []
466 for idx, cal_class in enumerate(calibration_classes):
468 self.logger.info('Start sensitivity analysis of class: %s, '
469 'Time-Interval: %s-%s s', cal_class.name,
470 cal_class.start_time, cal_class.stop_time)
472 # Generate list with metrics of every parameter variation
473 results_goals = {}
474 if load_sim_files:
475 self.problem = self.create_problem(cal_class.tuner_paras, scale=scale)
476 if use_first_sim:
477 class_name = calibration_classes[0].name
478 else:
479 class_name = cal_class.name
480 sim_dir = self.savepath_sim.joinpath(f'simulations_{class_name}')
481 samples_path = self.savepath_sim.joinpath(f'samples_{class_name}.csv')
482 self.logger.info(f'Loading samples from {samples_path}')
483 samples = pd.read_csv(samples_path,
484 header=0,
485 index_col=0)
486 samples = samples.to_numpy()
487 result_file_names = [f"simulation_{idx}.{self.suffix_files}" for idx in
488 range(len(samples))]
489 _filepaths = [sim_dir.joinpath(result_file_name) for result_file_name in
490 result_file_names]
491 self.logger.info(f'Loading simulation files from {sim_dir}')
492 output_array, output_verbose = self._load_eval(_filepaths, cal_class, n_cpu)
493 else:
494 results, samples = self.simulate_samples(
495 cal_class=cal_class,
496 scale=scale
497 )
498 if self.save_files:
499 output_array, output_verbose = self._load_eval(results, cal_class, n_cpu)
500 else:
501 output_array, output_verbose = self.eval_statistical_measure(
502 cal_class=cal_class,
503 results=results
504 )
505 if use_first_sim:
506 load_sim_files = True
508 # combine output_array and output_verbose
509 # set key for output_array depending on one or multiple goals
510 stat_mea = {'all': output_array}
511 if len(output_verbose) == 1:
512 stat_mea = output_verbose
513 if len(output_verbose) > 1 and verbose:
514 stat_mea.update(output_verbose)
516 # save statistical measure and corresponding samples for each cal_class in cd
517 if save_results:
518 result_file_names = [f"simulation_{idx}" for idx in range(len(output_array))]
519 stat_mea_df = pd.DataFrame(stat_mea, index=result_file_names)
520 savepath_stat_mea = self.cd.joinpath(
521 f'{cal_class.goals.statistical_measure}_{cal_class.name}.csv')
522 stat_mea_df.to_csv(savepath_stat_mea)
523 self.reproduction_files.append(savepath_stat_mea)
524 samples_df = pd.DataFrame(samples, columns=cal_class.tuner_paras.get_names(),
525 index=result_file_names)
526 savepath_samples = self.cd.joinpath(f'samples_{cal_class.name}.csv')
527 samples_df.to_csv(savepath_samples)
528 self.reproduction_files.append(savepath_samples)
530 self.logger.info('Starting calculation of analysis variables')
531 for key, val in stat_mea.items():
532 result_goal = self.analysis_function(
533 x=samples,
534 y=val
535 )
536 results_goals[key] = result_goal
537 all_results.append(results_goals)
538 self.logger.info('Finished sensitivity analysis of class: %s, '
539 'Time-Interval: %s-%s s', cal_class.name,
540 cal_class.start_time, cal_class.stop_time)
541 result = self._conv_local_results(results=all_results,
542 local_classes=calibration_classes)
543 if save_results:
544 self._save(result)
545 if plot_result:
546 self.plot(result)
547 return result, calibration_classes
549 def _save(self, result: pd.DataFrame, time_dependent: bool = False):
550 """
551 Saves the result DataFrame of run and run_time_dependent.
552 Needs to be overwritten for Sobol results.
553 """
554 if time_dependent:
555 savepath_result = self.cd.joinpath(f'{self.__class__.__name__}_results_time.csv')
556 else:
557 savepath_result = self.cd.joinpath(f'{self.__class__.__name__}_results.csv')
558 result.to_csv(savepath_result)
559 self.reproduction_files.append(savepath_result)
561 @staticmethod
562 def create_problem(tuner_paras, scale=False) -> dict:
563 """Create function for later access if multiple calibration-classes are used."""
564 num_vars = len(tuner_paras.get_names())
565 bounds = np.array(tuner_paras.get_bounds())
566 if scale:
567 bounds = [np.zeros_like(bounds[0]), np.ones_like(bounds[1])]
568 problem = {'num_vars': num_vars,
569 'names': tuner_paras.get_names(),
570 'bounds': np.transpose(bounds)}
571 return problem
573 @staticmethod
574 def select_by_threshold(calibration_classes, result, analysis_variable, threshold):
575 """
576 Automatically select sensitive tuner parameters based on a given threshold
577 of a given analysis variable from a sensitivity result.
578 Uses only the combined goals.
580 :param list calibration_classes:
581 List of aixcalibuha.data_types.CalibrationClass objects that you want to
582 automatically select sensitive tuner-parameters.
583 :param pd.DataFrame result:
584 Result object of sensitivity analysis run
585 :param str analysis_variable:
586 Analysis variable to use for the selection
587 :param float threshold:
588 Minimal required value of given key
589 :return: list calibration_classes
590 """
591 for cal_class in calibration_classes:
592 first_goal = result.index.get_level_values(1)[0]
593 class_result = result.loc[cal_class.name, first_goal, analysis_variable]
594 tuner_paras = copy.deepcopy(cal_class.tuner_paras)
595 select_names = class_result[class_result < threshold].index.values
596 tuner_paras.remove_names(select_names)
597 if not tuner_paras.get_names():
598 raise ValueError(
599 'Automatic selection removed all tuner parameter '
600 f'from class {cal_class.name} after '
601 'SensitivityAnalysis was done. Please adjust the '
602 'threshold in json or manually chose tuner '
603 'parameters for the calibration.')
604 # cal_class.set_tuner_paras(tuner_paras)
605 cal_class.tuner_paras = tuner_paras
606 return calibration_classes
608 @staticmethod
609 def select_by_threshold_verbose(calibration_class: CalibrationClass,
610 result: pd.DataFrame,
611 analysis_variable: str,
612 threshold: float,
613 calc_names_for_selection: List[str] = None):
614 """
615 Select tuner-parameters of single calibration class with verbose sensitivity results.
616 This function selects tuner-parameters if their sensitivity is equal or greater
617 than the threshold in just one target value of one calibration class in the
618 sensitivity result. This can be more robust because a small sensitivity in one target
619 value and state of the system can mean that the parameter can also be calibrated in
620 a global calibration class which calibrates multiple states and target values at
621 the same time and has there not directly the same sensitivity as in the isolated
622 view of a calibration class for only one state.
624 :param CalibrationClass calibration_class:
625 The calibration class from which the tuner parameters will be selected.
626 :param pd.DataFrame result:
627 Sensitivity results to use for the selection. Can include multiple classes.
628 :param str analysis_variable:
629 The analysis variable to use for the selection.
630 :param float threshold:
631 Minimal required value of given analysis variable.
632 :param List[str] calc_names_for_selection:
633 Specifies which calibration classes in the sensitivity results will be used for
634 the selection. Default are all classes.
635 """
636 if Counter(calibration_class.tuner_paras.get_names()) != Counter(list(result.columns)):
637 raise NameError("The tuner-parameter of the calibration class do not "
638 "match the tuner-parameters in the sensitivity result."
639 "They have to match.")
641 result = result.loc[:, :, analysis_variable]
642 calc_names_results = result.index.get_level_values("Class").unique()
643 if calc_names_for_selection:
644 for cal_class in calc_names_for_selection:
645 if cal_class not in calc_names_results:
646 raise NameError(f"The calibration class name {cal_class} "
647 f"does not match any class name "
648 f"in the given sensitivity result.")
649 result = result.loc[calc_names_for_selection, :, :]
651 selected_tuners = (result >= threshold).any()
653 remove_tuners = []
654 for tuner, selected in selected_tuners.items():
655 if not selected:
656 remove_tuners.append(tuner)
657 tuner_paras = copy.deepcopy(calibration_class.tuner_paras)
658 tuner_paras.remove_names(remove_tuners)
659 if not tuner_paras.get_names():
660 raise ValueError("Threshold to small. All tuner-parameters would be removed.")
661 calibration_class.tuner_paras = tuner_paras
662 return calibration_class
664 def run_time_dependent(self, cal_class: CalibrationClass, **kwargs):
665 """
666 Calculate the time dependent sensitivity for all the single goals in the calibration class.
668 :param CalibrationClass cal_class:
669 Calibration class with tuner-parameters to calculate sensitivity for.
670 Can include dummy target date.
671 :keyword scale:
672 Default is False. If True the bounds of the tuner-parameters
673 will be scaled between 0 and 1.
674 :keyword bool load_sim_files:
675 Default False. If True, no new simulations are done and old simulations are loaded.
676 The simulations and corresponding samples will be loaded from self.savepath_sim like
677 they were saved from self.save_files. Currently, the name of the sim folder must be
678 "simulations_CAL_CLASS_NAME" and for the samples "samples_CAL_CLASS_NAME".
679 :keyword bool save_results:
680 Default True. If True, all results are saved as a csv in cd.
681 (samples and analysis variables).
682 :keyword int n_steps:
683 Default is all time steps. If the problem is large, the evaluation of all time steps
684 at once can cause a memory error. Then n_steps defines how many time_steps
685 are evaluated at once in chunks. This increases the needed time exponentially and
686 the simulation files must be saved.
687 :keyword bool plot_result:
688 Default True. If True, the results will be plotted.
689 :return:
690 Returns a pandas.DataFrame.
691 :rtype: pandas.DataFrame
692 """
693 scale = kwargs.pop('scale', False)
694 save_results = kwargs.pop('save_results', True)
695 plot_result = kwargs.pop('plot_result', True)
696 load_sim_files = kwargs.pop('load_sim_files', False)
697 n_steps = kwargs.pop('n_steps', 'all')
699 self.logger.info("Start time dependent sensitivity analysis.")
700 if load_sim_files:
701 self.problem = self.create_problem(cal_class.tuner_paras, scale=scale)
702 sim_dir = self.savepath_sim.joinpath(f'simulations_{cal_class.name}')
703 samples_path = self.savepath_sim.joinpath(f'samples_{cal_class.name}.csv')
704 samples = pd.read_csv(samples_path,
705 header=0,
706 index_col=0)
707 samples = samples.to_numpy()
708 result_file_names = [f"simulation_{idx}.{self.suffix_files}" for idx in
709 range(len(samples))]
710 _filepaths = [sim_dir.joinpath(result_file_name) for result_file_name in
711 result_file_names]
713 sen_time_dependent_list, time_index = self._load_analyze_tsteps(_filepaths=_filepaths,
714 samples=samples,
715 n_steps=n_steps,
716 cal_class=cal_class)
717 sen_time_dependent_df = _restruct_time_dependent(sen_time_dependent_list, time_index)
718 else:
719 results, samples = self.simulate_samples(
720 cal_class=cal_class,
721 scale=scale
722 )
723 if self.save_files:
724 sen_time_dependent_list, time_index = self._load_analyze_tsteps(_filepaths=results,
725 samples=samples,
726 n_steps=n_steps,
727 cal_class=cal_class)
728 sen_time_dependent_df = _restruct_time_dependent(sen_time_dependent_list,
729 time_index)
730 else:
731 variables = results[0].get_variable_names()
732 time_index = results[0].index.to_numpy()
733 total_result = _concat_all_sims(results)
734 sen_time_dependent_list = []
735 for time_step in time_index:
736 result_df_tstep = self._analyze_tstep_df(time_step=time_step,
737 tsteps_sim_results=total_result,
738 variables=variables,
739 samples=samples,
740 cal_class=cal_class)
741 sen_time_dependent_list.append(result_df_tstep)
742 sen_time_dependent_df = _restruct_time_dependent(sen_time_dependent_list,
743 time_index)
744 self.logger.info("Finished time dependent sensitivity analysys.")
745 if save_results:
746 self._save(sen_time_dependent_df, time_dependent=True)
747 if plot_result:
748 if isinstance(sen_time_dependent_df, pd.DataFrame):
749 plot_time_dependent(sen_time_dependent_df)
750 else:
751 plot_time_dependent(sen_time_dependent_df[0])
752 return sen_time_dependent_df
754 def _analyze_tstep_df(self, time_step, tsteps_sim_results, variables, samples, cal_class):
755 """Analyze the sensitivity at a single time step."""
756 result_dict_tstep = {}
757 for var in variables:
758 result_tstep_var = tsteps_sim_results[var].loc[time_step].to_numpy()
759 if np.all(result_tstep_var == result_tstep_var[0]):
760 sen_tstep_var = None
761 else:
762 sen_tstep_var = self.analysis_function(
763 x=samples,
764 y=result_tstep_var
765 )
766 result_dict_tstep[var] = sen_tstep_var
767 result_df_tstep = self._conv_local_results(results=[result_dict_tstep],
768 local_classes=[cal_class])
769 return result_df_tstep
771 def _load_tsteps_df(self, tsteps, _filepaths):
772 """
773 Load all simulations and extract and concat the sim results of the time steps in tsteps.
774 """
775 self.logger.info(
776 f"Loading time steps from {tsteps[0]} to {tsteps[-1]} of the simulation files.")
777 tsteps_sim_results = []
778 for _filepath in _filepaths:
779 sim = _load_single_file(_filepath)
780 tsteps_sim_results.append(sim.loc[tsteps[0]:tsteps[-1]])
781 tsteps_sim_results = _concat_all_sims(tsteps_sim_results)
782 return tsteps_sim_results
784 def _load_analyze_tsteps(self, _filepaths, samples, n_steps, cal_class):
785 """
786 Load and analyze all time steps in chunks with n_steps time steps.
787 """
788 sim1 = _load_single_file(_filepaths[0])
789 time_index = sim1.index.to_numpy()
790 variables = sim1.get_variable_names()
791 sen_time_dependent_list = []
792 if n_steps == 'all':
793 list_tsteps = [time_index]
794 elif isinstance(n_steps, int) and not (n_steps <= 0 or n_steps > len(time_index)):
795 list_tsteps = _divide_chunks(time_index, n_steps)
796 else:
797 raise ValueError(
798 f"n_steps can only be between 1 and {len(time_index)} or the string all.")
800 for tsteps in list_tsteps:
801 tsteps_sim_results = self._load_tsteps_df(tsteps=tsteps, _filepaths=_filepaths)
802 self.logger.info("Analyzing these time steps.")
803 for tstep in tsteps:
804 result_df_tstep = self._analyze_tstep_df(time_step=tstep,
805 tsteps_sim_results=tsteps_sim_results,
806 variables=variables,
807 samples=samples,
808 cal_class=cal_class)
809 sen_time_dependent_list.append(result_df_tstep)
810 return sen_time_dependent_list, time_index
812 def _conv_global_result(self, result: dict, cal_class: CalibrationClass,
813 analysis_variable: str):
814 glo_res_dict = self._get_res_dict(result=result, cal_class=cal_class,
815 analysis_variable=analysis_variable)
816 return pd.DataFrame(glo_res_dict, index=['global'])
818 def _conv_local_results(self, results: list, local_classes: list):
819 """
820 Convert the result dictionaries form SALib of each class and goal into one DataFrame.
821 Overwritten for Sobol.
822 """
823 _conv_results = []
824 tuples = []
825 for class_results, local_class in zip(results, local_classes):
826 for goal, goal_results in class_results.items():
827 for analysis_var in self.analysis_variables:
828 _conv_results.append(self._get_res_dict(result=goal_results,
829 cal_class=local_class,
830 analysis_variable=analysis_var))
831 tuples.append((local_class.name, goal, analysis_var))
832 index = pd.MultiIndex.from_tuples(tuples=tuples,
833 names=['Class', 'Goal', 'Analysis variable'])
834 df = pd.DataFrame(_conv_results, index=index)
835 return df
837 @abc.abstractmethod
838 def _get_res_dict(self, result: dict, cal_class: CalibrationClass, analysis_variable: str):
839 """
840 Convert the result object to a dict with the key
841 being the variable name and the value being the result
842 associated to analysis_variable.
843 """
844 raise NotImplementedError
846 def plot(self, result):
847 """
848 Plot the results of the sensitivity analysis method from run().
850 :param pd.DataFrame result:
851 Dataframe of the results like from the run() function.
852 :return tuple of matplotlib objects (fig, ax):
853 """
854 plot_single(result=result)
856 @staticmethod
857 def load_from_csv(path):
858 """
859 Load sensitivity results which were saved with the run() or run_time_dependent() function.
861 For second order results use the load_second_order_from_csv() function of the SobolAnalyzer.
862 """
863 result = pd.read_csv(path, index_col=[0, 1, 2])
864 return result
866 def save_for_reproduction(self,
867 title: str,
868 path: pathlib.Path = None,
869 files: list = None,
870 exclude_sim_files: bool = False,
871 remove_saved_files: bool = False,
872 **kwargs):
873 """
874 Save the settings of the SenAnalyzer and SimApi in order to
875 reproduce the simulations and sensitivity analysis method.
876 All saved results will be also saved in the reproduction
877 archive. The simulations can be excluded from saving.
879 :param str title:
880 Title of the study
881 :param pathlib.Path path:
882 Where to store the .zip file. If not given, self.cd is used.
883 :param list files:
884 List of files to save along the standard ones.
885 Examples would be plots, tables etc.
886 :param bool exclude_sim_files:
887 Default False. If True, the simulation files will not be saved in
888 the reproduction archive.
889 :param bool remove_saved_files:
890 Default False. If True, the result and simulation files will be moved
891 instead of just copied.
892 :param dict kwargs:
893 All keyword arguments except title, files, and path of the function
894 `save_reproduction_archive`. Most importantly, `log_message` may be
895 specified to avoid input during execution.
896 """
897 if files is None:
898 files = []
900 for file_path in self.reproduction_files:
901 if exclude_sim_files:
902 if 'simulation' in str(file_path):
903 continue
904 filename = "SenAnalyzer" + str(file_path).rsplit(self.cd.name, maxsplit=1)[-1]
905 files.append(CopyFile(
906 sourcepath=file_path,
907 filename=filename,
908 remove=remove_saved_files
909 ))
911 return self.sim_api.save_for_reproduction(
912 title=title,
913 path=path,
914 files=files,
915 **kwargs
916 )