Coverage for aixcalibuha/sensitivity_analysis/sensitivity_analyzer.py: 92%
424 statements
« prev ^ index » next coverage.py v7.4.4, created at 2026-04-30 14:23 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2026-04-30 14:23 +0000
1"""Package containing modules for sensitivity analysis.
2The module contains the relevant base-classes."""
3import abc
4import copy
5import os
6from pathlib import Path
7import multiprocessing as mp
8import warnings
9from typing import List
10from collections import Counter
11import numpy as np
12import pandas as pd
13from ebcpy import load_time_series_data
14from ebcpy.utils import setup_logger
15from ebcpy.utils.reproduction import CopyFile
16from ebcpy.simulationapi import SimulationAPI
17from aixcalibuha import CalibrationClass, data_types
18from aixcalibuha.utils import validate_cal_class_input, convert_mat_to_suffix, empty_postprocessing
19from aixcalibuha.sensitivity_analysis.plotting import plot_single, plot_time_dependent
22def _load_single_file(_filepath, parquet_engine='pyarrow'):
23 """Helper function"""
24 if _filepath is None:
25 return None
26 return load_time_series_data(_filepath, key='simulation', engine=parquet_engine)
29def _load_files(_filepaths, parquet_engine='pyarrow'):
30 """Helper function"""
31 results = []
32 for _filepath in _filepaths:
33 results.append(_load_single_file(_filepath, parquet_engine=parquet_engine))
34 return results
37def _restruct_verbose(list_output_verbose):
38 """Helper function"""
39 output_verbose = {}
40 for key, val in list_output_verbose[0].items():
41 output_verbose[key] = np.array([])
42 for i in list_output_verbose:
43 for key, val in i.items():
44 output_verbose[key] = np.append(output_verbose[key], np.array([val[1]]))
45 return output_verbose
48def _concat_all_sims(sim_results_list):
49 """Helper function that concat all results in a list to one DataFrame."""
50 sim_results_list = [r for r in sim_results_list]
51 sim_results_list = pd.concat(sim_results_list, keys=range(len(sim_results_list)),
52 axis='columns')
53 sim_results_list = sim_results_list.swaplevel(axis=1).sort_index(axis=1)
54 return sim_results_list
57def _restruct_time_dependent(sen_time_dependent_list, time_index):
58 """Helper function that restructures the time dependent sensitivity results."""
60 def _restruct_single(sen_time_dependent_list_s, second_order=False):
61 sen_time_dependent_df = pd.concat(sen_time_dependent_list_s, keys=time_index, axis=0)
62 sen_time_dependent_df = sen_time_dependent_df.droplevel('Class', axis='index')
63 sen_time_dependent_df = sen_time_dependent_df.swaplevel(0, 1)
64 sen_time_dependent_df = sen_time_dependent_df.swaplevel(1, 2).sort_index(axis=0)
65 if second_order:
66 sen_time_dependent_df = sen_time_dependent_df.swaplevel(2, 3).sort_index(axis=0)
67 sen_time_dependent_df.index.set_names(
68 ['Goal', 'Analysis variable', 'Interaction', 'time'], inplace=True)
69 else:
70 sen_time_dependent_df.index.set_names(['Goal', 'Analysis variable', 'time'],
71 inplace=True)
72 return sen_time_dependent_df
74 if isinstance(sen_time_dependent_list[0], tuple):
75 sen_time_dependent_list1, sen_time_dependent_list2 = zip(*sen_time_dependent_list)
76 return _restruct_single(sen_time_dependent_list1), _restruct_single(
77 sen_time_dependent_list2, True)
78 return _restruct_single(sen_time_dependent_list)
81def _divide_chunks(long_list, chunk_length):
82 """Helper function that divides all list into multiple list with a specific chunk length."""
83 for i in range(0, len(long_list), chunk_length):
84 yield long_list[i:i + chunk_length]
87class SenAnalyzer(abc.ABC):
88 """
89 Class to perform a Sensitivity Analysis.
91 :param SimulationAPI sim_api:
92 Simulation-API used to simulate the samples
93 :param int num_samples:
94 The parameter `N` to the sampler methods of sobol and morris. NOTE: This is not the
95 number of samples produced, but relates to the total number of samples produced in
96 a manner dependent on the sampler method used. See the documentation of the specific
97 method in the SALib for more information.
98 :keyword str,Path working_directory:
99 The path for the current working directory.
100 Logger and results will be stored here.
101 :keyword boolean fail_on_error:
102 Default is False. If True, the calibration will stop with an error if
103 the simulation fails. See also: ``ret_val_on_error``
104 :keyword float,np.nan ret_val_on_error:
105 Default is np.nan. If ``fail_on_error`` is false, you can specify here
106 which value to return in the case of a failed simulation. Possible
107 options are np.nan, np.inf or some other high numbers. be aware that this
108 max influence the solver.
109 :keyword boolean save_files:
110 Default False. If true, all simulation files for each iteration will be saved!
111 :keyword str suffix_files:
112 Default 'csv'. Specifies the data format to store the simulation files in.
113 Options are 'csv', 'parquet', or 'parquet.COMPRESSION' (e.g. 'parquet.snappy',
114 'parquet.gzip') to save only the goals.
115 If you want to keep the original 'mat' file specify 'mat' here
116 (not recommended due to high disk size usage).
117 :keyword str parquet_engine:
118 The engine to use for the data format parquet.
119 Supported options can be extracted
120 from the ebcpy DataFrame accessor ``df.tsd.save()`` function.
121 Default is 'pyarrow'.
122 :keyword str,Path savepath_sim:
123 Default is working_directory. Own directory for the time series data sets of all simulations
124 during the sensitivity analysis. The own dir can be necessary for large data sets,
125 because they can crash IDE during indexing when they are in the project folder.
127 """
129 def __init__(self,
130 sim_api: SimulationAPI,
131 num_samples: int,
132 **kwargs):
133 """Instantiate class parameters"""
134 # Setup the instance attributes
135 self.sim_api = sim_api
136 self.num_samples = num_samples
138 # Update kwargs
139 self.fail_on_error = kwargs.pop("fail_on_error", True)
140 self.save_files = kwargs.pop("save_files", False)
141 self.suffix_files = kwargs.pop('suffix_files', 'csv')
142 self.parquet_engine = kwargs.pop('parquet_engine', 'pyarrow')
143 self.ret_val_on_error = kwargs.pop("ret_val_on_error", np.nan)
144 self.working_directory = kwargs.pop("working_directory", os.getcwd())
146 if "cd" in kwargs:
147 warnings.warn(
148 "cd was renamed to working_directory in all classes. "
149 "Use working_directory instead.",
150 category=DeprecationWarning)
151 self.working_directory = kwargs.pop("cd")
153 self.savepath_sim = kwargs.pop('savepath_sim', self.working_directory)
155 if isinstance(self.working_directory, str):
156 self.working_directory = Path(self.working_directory)
157 if not self.working_directory.exists():
158 self.working_directory.mkdir(parents=True, exist_ok=True)
160 if isinstance(self.savepath_sim, str):
161 self.savepath_sim = Path(self.savepath_sim)
163 # Setup the logger
164 self.logger = setup_logger(working_directory=self.working_directory,
165 name=self.__class__.__name__)
167 # Setup default values
168 self.problem: dict = None
169 self.reproduction_files = []
171 @property
172 @abc.abstractmethod
173 def analysis_variables(self) -> List[str]:
174 """
175 Indicate which variables are
176 able to be selected for analysis
178 :return:
179 A list of strings
180 :rtype: List[str]
181 """
182 raise NotImplementedError(f'{self.__class__.__name__}.analysis_variables '
183 f'property is not defined yet')
185 @abc.abstractmethod
186 def analysis_function(self, x, y):
187 """
188 Use the method to analyze the simulation results.
190 :param np.array x:
191 the `X` parameter of the method (The NumPy matrix containing the model inputs)
192 :param np.array y:
193 The NumPy array containing the model outputs
194 """
195 raise NotImplementedError(f'{self.__class__.__name__}.analysis_function '
196 f'function is not defined yet')
198 @abc.abstractmethod
199 def create_sampler_demand(self) -> dict:
200 """
201 Return the sampler parameters
203 :return:
204 dict: A dict with the sampler demand
205 """
206 raise NotImplementedError(f'{self.__class__.__name__}.analysis_function '
207 f'function is not defined yet')
209 @abc.abstractmethod
210 def generate_samples(self):
211 """
212 Run the sampler specified by `method` and return the results.
214 :return:
215 The list of samples generated as a NumPy array with one row per sample
216 and each row containing one value for each variable name in `problem['names']`.
217 :rtype: np.ndarray
218 """
219 raise NotImplementedError(f'{self.__class__.__name__}.generate_samples '
220 f'function is not defined yet')
222 def simulate_samples(self, cal_class, **kwargs):
223 """
224 Creates the samples for the calibration class and simulates them.
226 :param cal_class:
227 One class for calibration. Goals and tuner_paras have to be set
228 :keyword scale:
229 Default is False. If True the bounds of the tuner-parameters
230 will be scaled between 0 and 1.
232 :return:
233 Returns two lists. First a list with the simulation results for each sample.
234 If save_files the list contains the filepaths to the results
235 Second a list of the samples.
236 :rtype: list
237 """
238 scale = kwargs.pop('scale', False)
239 # Set the output interval according the given Goals
240 mean_freq = cal_class.goals.get_meas_frequency()
241 self.logger.info("Setting output_interval of simulation according "
242 "to measurement target data frequency: %s", mean_freq)
243 self.sim_api.sim_setup.output_interval = mean_freq
244 initial_names = cal_class.tuner_paras.get_names()
245 self.sim_api.set_sim_setup({"start_time": cal_class.start_time,
246 "stop_time": cal_class.stop_time})
247 self.sim_api.result_names = cal_class.goals.get_sim_var_names()
249 self.problem = self.create_problem(cal_class.tuner_paras, scale=scale)
250 samples = self.generate_samples()
252 # creat df of samples with the result_file_names as the index
253 result_file_names = [f"simulation_{idx}" for idx in range(len(samples))]
254 samples_df = pd.DataFrame(samples, columns=initial_names, index=result_file_names)
255 samples_df.to_csv(self.working_directory.joinpath(f'samples_{cal_class.name}.csv'))
257 # Simulate the current values
258 parameters = []
259 for initial_values in samples:
260 if scale:
261 initial_values = cal_class.tuner_paras.descale(initial_values)
262 parameters.append(dict(zip(initial_names, initial_values)))
264 self.logger.info('Starting %s parameter variations on %s cores',
265 len(samples), self.sim_api.n_cpu)
266 if self.save_files:
267 sim_dir = self.savepath_sim.joinpath(f'simulations_{cal_class.name}')
268 os.makedirs(sim_dir, exist_ok=True)
269 samples_df.to_csv(self.savepath_sim.joinpath(f'samples_{cal_class.name}.csv'))
270 self.logger.info(f'Saving simulation files in: {sim_dir}')
271 if self.suffix_files == "mat":
272 postprocess_mat_result = empty_postprocessing
273 kwargs_postprocessing = {}
274 else:
275 postprocess_mat_result = convert_mat_to_suffix
276 kwargs_postprocessing = {
277 'variable_names': self.sim_api.result_names,
278 'suffix_files': self.suffix_files,
279 'parquet_engine': self.parquet_engine
280 }
281 if self.sim_api.__class__.__name__ == "DymolaAPI":
282 cal_class.input_kwargs["postprocess_mat_result"] = postprocess_mat_result
283 cal_class.input_kwargs["kwargs_postprocessing"] = kwargs_postprocessing
284 _filepaths = self.sim_api.simulate(
285 parameters=parameters,
286 return_option="savepath",
287 savepath=sim_dir,
288 result_file_name=result_file_names,
289 fail_on_error=self.fail_on_error,
290 inputs=cal_class.inputs,
291 **cal_class.input_kwargs
292 )
293 self.reproduction_files.extend(_filepaths)
294 results = _filepaths
295 else:
296 results = self.sim_api.simulate(
297 parameters=parameters,
298 inputs=cal_class.inputs,
299 fail_on_error=self.fail_on_error,
300 **cal_class.input_kwargs
301 )
302 self.logger.info('Finished %s simulations', len(samples))
303 return results, samples
305 def _check_index(self, tsd: pd.DataFrame, sim_num=None):
306 freq = tsd.tsd.frequency
307 if sim_num is None:
308 sim_num = getattr(tsd.tsd, 'filepath', None)
309 sim_num = sim_num.name if sim_num is not None else 'unknown'
310 if freq[0] != self.sim_api.sim_setup.output_interval:
311 self.logger.info(
312 f'The mean value of the frequency from {sim_num} does not match output '
313 'interval index will be cleaned and spaced equally')
314 tsd.tsd.to_datetime_index()
315 desired_freq = f'{str(self.sim_api.sim_setup.output_interval * 1000)}ms'
316 tsd = tsd.tsd.clean_and_space_equally(desired_freq, inplace=False)
317 tsd.tsd.to_float_index()
318 freq = tsd.tsd.frequency
319 if freq[1] > 0.0:
320 self.logger.info(f'The standard deviation of the frequency from {sim_num} is to high '
321 f'and will be rounded to the accuracy of the output interval')
322 tsd.index = np.round(tsd.index.astype("float64"),
323 str(self.sim_api.sim_setup.output_interval)[::-1].find('.'))
324 return tsd
326 def _single_eval_statistical_measure(self, kwargs_eval):
327 """Evaluates statistical measure of one result"""
328 cal_class = kwargs_eval.pop('cal_class')
329 result = kwargs_eval.pop('result')
330 num_sim = kwargs_eval.pop('sim_num', None)
331 if result is None:
332 verbose_error = {}
333 for goal, weight in zip(cal_class.goals.get_goals_list(), cal_class.goals.weightings):
334 verbose_error[goal] = (weight, self.ret_val_on_error)
335 return self.ret_val_on_error, verbose_error
336 result = self._check_index(result, num_sim)
337 cal_class.goals.set_sim_target_data(result)
338 cal_class.goals.set_relevant_time_intervals(cal_class.relevant_intervals)
339 # Evaluate the current objective
340 total_res, verbose_calculation = cal_class.goals.eval_difference(verbose=True)
341 return total_res, verbose_calculation
343 def eval_statistical_measure(self, cal_class, results, verbose=True):
344 """Evaluates statistical measures of results on single core"""
345 self.logger.info('Starting evaluation of statistical measure')
346 output = []
347 list_output_verbose = []
348 for i, result in enumerate(results):
349 total_res, verbose_calculation = self._single_eval_statistical_measure(
350 {'cal_class': cal_class, 'result': result, 'sim_num': f'simulation_{i}'}
351 )
352 output.append(total_res)
353 list_output_verbose.append(verbose_calculation)
354 if verbose:
355 # restructure output_verbose
356 output_verbose = _restruct_verbose(list_output_verbose)
357 return np.asarray(output), output_verbose
358 return np.asarray(output)
360 def _single_load_eval_file(self, kwargs_load_eval):
361 """For multiprocessing"""
362 filepath = kwargs_load_eval.pop('filepath')
363 _result = _load_single_file(filepath, self.parquet_engine)
364 kwargs_load_eval.update({'result': _result})
365 total_res, verbose_calculation = self._single_eval_statistical_measure(kwargs_load_eval)
366 return total_res, verbose_calculation
368 def _mp_load_eval(self, _filepaths, cal_class, n_cpu):
369 """
370 Loading and evaluating the statistical measure of saved simulation files on multiple cores
371 """
372 self.logger.info(f'Load files and evaluate statistical measure on {n_cpu} processes.')
373 kwargs_load_eval = []
374 for filepath in _filepaths:
375 kwargs_load_eval.append({'filepath': filepath, 'cal_class': cal_class})
376 output_array = []
377 list_output_verbose = []
378 with mp.Pool(processes=n_cpu) as pool:
379 for total, verbose in pool.imap(self._single_load_eval_file, kwargs_load_eval):
380 output_array.append(total)
381 list_output_verbose.append(verbose)
382 output_array = np.asarray(output_array)
383 output_verbose = _restruct_verbose(list_output_verbose)
384 return output_array, output_verbose
386 def _load_eval(self, _filepaths, cal_class, n_cpu):
387 """
388 Loading and evaluating the statistical measure of saved simulation files.
389 Single- or multiprocessing possible with definition of n_cpu.
390 """
391 if n_cpu == 1:
392 results = _load_files(_filepaths, self.parquet_engine)
393 output_array, output_verbose = self.eval_statistical_measure(
394 cal_class=cal_class,
395 results=results
396 )
397 return output_array, output_verbose
398 output_array, output_verbose = self._mp_load_eval(_filepaths, cal_class, n_cpu)
399 return output_array, output_verbose
401 def run(self, calibration_classes, merge_multiple_classes=True, **kwargs):
402 """
403 Execute the sensitivity analysis for each class and
404 return the result.
406 :param CalibrationClass,list calibration_classes:
407 Either one or multiple classes for calibration with same tuner-parameters.
408 :param bool merge_multiple_classes:
409 Default True. If False, the given list of calibration-classes
410 is handled as-is. This means if you pass two CalibrationClass objects
411 with the same name (e.g. "device on"), the calibration process will run
412 for both these classes stand-alone.
413 This will automatically yield an intersection of tuner-parameters, however may
414 have advantages in some cases.
415 :keyword bool verbose:
416 Default False. If True, in addition to the combined Goals of the Classes
417 (saved under index Goal: all), the sensitivity measures of the individual
418 Goals will also be calculated and returned.
419 :keyword scale:
420 Default is False. If True the bounds of the tuner-parameters
421 will be scaled between 0 and 1.
422 :keyword bool use_fist_sim:
423 Default False. If True, the simulations of the first calibration class will be used for
424 all other calibration classes with their relevant time intervals.
425 The simulations must be stored on a hard-drive, so it must be used with
426 either save_files or load_files.
427 :keyword int n_cpu:
428 Default is 1. The number of processes to use for the evaluation of the statistical
429 measure. For n_cpu > 1 only one simulation file is loaded at once in a process and
430 dumped directly after the evaluation of the statistical measure,
431 so that only minimal memory is used.
432 Use this option for large analyses.
433 Only implemented for save_files=True or load_sim_files=True.
434 :keyword bool load_sim_files:
435 Default False. If True, no new simulations are done and old simulations are loaded.
436 The simulations and corresponding samples will be loaded from self.savepath_sim like
437 they were saved from self.save_files. Currently, the name of the sim folder must be
438 "simulations_CAL_CLASS_NAME" and for the samples "samples_CAL_CLASS_NAME".
439 The usage of the same simulations for different
440 calibration classes is not supported yet.
441 :keyword bool save_results:
442 Default True. If True, all results are saved as a csv in working_directory.
443 (samples, statistical measures and analysis variables).
444 :keyword bool plot_result:
445 Default True. If True, the results will be plotted.
446 :return:
447 Returns a pandas.DataFrame. The DataFrame has a Multiindex with the
448 levels Class, Goal and Analysis variable. The Goal name of combined goals is 'all'.
449 The variables are the tuner-parameters.
450 For the Sobol Method and calc_second_order returns a tuple of DataFrames (df_1, df_2)
451 where df_2 contains the second oder analysis variables and has an extra index level
452 Interaction, which also contains the variables.
453 :rtype: pandas.DataFrame
454 """
455 verbose = kwargs.pop('verbose', False)
456 scale = kwargs.pop('scale', False)
457 use_first_sim = kwargs.pop('use_first_sim', False)
458 n_cpu = kwargs.pop('n_cpu', 1)
459 save_results = kwargs.pop('save_results', True)
460 plot_result = kwargs.pop('plot_result', True)
461 load_sim_files = kwargs.pop('load_sim_files', False)
462 # Check correct input
463 calibration_classes = validate_cal_class_input(calibration_classes)
464 # Merge the classes for avoiding possible intersection of tuner-parameters
465 if merge_multiple_classes:
466 calibration_classes = data_types.merge_calibration_classes(calibration_classes)
468 # Check n_cpu
469 if n_cpu > mp.cpu_count():
470 raise ValueError(f"Given n_cpu '{n_cpu}' is greater "
471 "than the available number of "
472 f"cpus on your machine '{mp.cpu_count()}'")
474 # Check if the usage of the simulations from the first calibration class for all is possible
475 if use_first_sim:
476 if not self.save_files and not load_sim_files:
477 raise AttributeError('To use the simulations of the first calibration class '
478 'for all classes the simulation files must be saved. '
479 'Either set save_files=True or load already exiting files '
480 'with load_sim_files=True.')
481 start_time = 0
482 stop_time = 0
483 for idx, cal_class in enumerate(calibration_classes):
484 if idx == 0:
485 start_time = cal_class.start_time
486 stop_time = cal_class.stop_time
487 continue
488 if start_time > cal_class.start_time or stop_time < cal_class.stop_time:
489 raise ValueError(f'To use the simulations of the first calibration class '
490 f'for all classes the start and stop times of the other '
491 f'classes must be in the interval [{start_time}, {stop_time}] '
492 f'of the first calibration class.')
494 all_results = []
495 for idx, cal_class in enumerate(calibration_classes):
497 self.logger.info('Start sensitivity analysis of class: %s, '
498 'Time-Interval: %s-%s s', cal_class.name,
499 cal_class.start_time, cal_class.stop_time)
501 # Generate list with metrics of every parameter variation
502 results_goals = {}
503 if load_sim_files:
504 self.problem = self.create_problem(cal_class.tuner_paras, scale=scale)
505 if use_first_sim:
506 class_name = calibration_classes[0].name
507 else:
508 class_name = cal_class.name
509 sim_dir = self.savepath_sim.joinpath(f'simulations_{class_name}')
510 samples_path = self.savepath_sim.joinpath(f'samples_{class_name}.csv')
511 self.logger.info(f'Loading samples from {samples_path}')
512 samples = pd.read_csv(samples_path,
513 header=0,
514 index_col=0)
515 samples = samples.to_numpy()
516 result_file_names = [f"simulation_{idx}.{self.suffix_files}" for idx in
517 range(len(samples))]
518 _filepaths = [sim_dir.joinpath(result_file_name) for result_file_name in
519 result_file_names]
520 self.logger.info(f'Loading simulation files from {sim_dir}')
521 output_array, output_verbose = self._load_eval(_filepaths, cal_class, n_cpu)
522 else:
523 results, samples = self.simulate_samples(
524 cal_class=cal_class,
525 scale=scale
526 )
527 if self.save_files:
528 output_array, output_verbose = self._load_eval(results, cal_class, n_cpu)
529 else:
530 output_array, output_verbose = self.eval_statistical_measure(
531 cal_class=cal_class,
532 results=results
533 )
534 if use_first_sim:
535 load_sim_files = True
537 # combine output_array and output_verbose
538 # set key for output_array depending on one or multiple goals
539 stat_mea = {'all': output_array}
540 if len(output_verbose) == 1:
541 stat_mea = output_verbose
542 if len(output_verbose) > 1 and verbose:
543 stat_mea.update(output_verbose)
545 # save statistical measure and corresponding samples for each cal_class in working_directory
546 if save_results:
547 result_file_names = [f"simulation_{idx}" for idx in range(len(output_array))]
548 stat_mea_df = pd.DataFrame(stat_mea, index=result_file_names)
549 savepath_stat_mea = self.working_directory.joinpath(
550 f'{cal_class.goals.statistical_measure}_{cal_class.name}.csv')
551 stat_mea_df.to_csv(savepath_stat_mea)
552 self.reproduction_files.append(savepath_stat_mea)
553 samples_df = pd.DataFrame(samples, columns=cal_class.tuner_paras.get_names(),
554 index=result_file_names)
555 savepath_samples = self.working_directory.joinpath(f'samples_{cal_class.name}.csv')
556 samples_df.to_csv(savepath_samples)
557 self.reproduction_files.append(savepath_samples)
559 self.logger.info('Starting calculation of analysis variables')
560 for key, val in stat_mea.items():
561 result_goal = self.analysis_function(
562 x=samples,
563 y=val
564 )
565 results_goals[key] = result_goal
566 all_results.append(results_goals)
567 self.logger.info('Finished sensitivity analysis of class: %s, '
568 'Time-Interval: %s-%s s', cal_class.name,
569 cal_class.start_time, cal_class.stop_time)
570 result = self._conv_local_results(results=all_results,
571 local_classes=calibration_classes)
572 if save_results:
573 self._save(result)
574 if plot_result:
575 self.plot(result)
576 return result, calibration_classes
578 def _save(self, result: pd.DataFrame, time_dependent: bool = False):
579 """
580 Saves the result DataFrame of run and run_time_dependent.
581 Needs to be overwritten for Sobol results.
582 """
583 if time_dependent:
584 savepath_result = self.working_directory.joinpath(
585 f'{self.__class__.__name__}_results_time.csv')
586 else:
587 savepath_result = self.working_directory.joinpath(
588 f'{self.__class__.__name__}_results.csv')
589 result.to_csv(savepath_result)
590 self.reproduction_files.append(savepath_result)
592 @staticmethod
593 def create_problem(tuner_paras, scale=False) -> dict:
594 """Create function for later access if multiple calibration-classes are used."""
595 num_vars = len(tuner_paras.get_names())
596 bounds = np.array(tuner_paras.get_bounds())
597 if scale:
598 bounds = [np.zeros_like(bounds[0]), np.ones_like(bounds[1])]
599 problem = {'num_vars': num_vars,
600 'names': tuner_paras.get_names(),
601 'bounds': np.transpose(bounds)}
602 return problem
604 @staticmethod
605 def select_by_threshold(calibration_classes, result, analysis_variable, threshold):
606 """
607 Automatically select sensitive tuner parameters based on a given threshold
608 of a given analysis variable from a sensitivity result.
609 Uses only the combined goals.
611 :param list calibration_classes:
612 List of aixcalibuha.data_types.CalibrationClass objects that you want to
613 automatically select sensitive tuner-parameters.
614 :param pd.DataFrame result:
615 Result object of sensitivity analysis run
616 :param str analysis_variable:
617 Analysis variable to use for the selection
618 :param float threshold:
619 Minimal required value of given key
620 :return: list calibration_classes
621 """
622 for cal_class in calibration_classes:
623 first_goal = result.index.get_level_values(1)[0]
624 class_result = result.loc[cal_class.name, first_goal, analysis_variable]
625 tuner_paras = copy.deepcopy(cal_class.tuner_paras)
626 select_names = class_result[class_result < threshold].index.values
627 tuner_paras.remove_names(select_names)
628 if not tuner_paras.get_names():
629 raise ValueError(
630 'Automatic selection removed all tuner parameter '
631 f'from class {cal_class.name} after '
632 'SensitivityAnalysis was done. Please adjust the '
633 'threshold in json or manually chose tuner '
634 'parameters for the calibration.')
635 # cal_class.set_tuner_paras(tuner_paras)
636 cal_class.tuner_paras = tuner_paras
637 return calibration_classes
639 @staticmethod
640 def select_by_threshold_verbose(calibration_class: CalibrationClass,
641 result: pd.DataFrame,
642 analysis_variable: str,
643 threshold: float,
644 calc_names_for_selection: List[str] = None):
645 """
646 Select tuner-parameters of single calibration class with verbose sensitivity results.
647 This function selects tuner-parameters if their sensitivity is equal or greater
648 than the threshold in just one target value of one calibration class in the
649 sensitivity result. This can be more robust because a small sensitivity in one target
650 value and state of the system can mean that the parameter can also be calibrated in
651 a global calibration class which calibrates multiple states and target values at
652 the same time and has there not directly the same sensitivity as in the isolated
653 view of a calibration class for only one state.
655 :param CalibrationClass calibration_class:
656 The calibration class from which the tuner parameters will be selected.
657 :param pd.DataFrame result:
658 Sensitivity results to use for the selection. Can include multiple classes.
659 :param str analysis_variable:
660 The analysis variable to use for the selection.
661 :param float threshold:
662 Minimal required value of given analysis variable.
663 :param List[str] calc_names_for_selection:
664 Specifies which calibration classes in the sensitivity results will be used for
665 the selection. Default are all classes.
666 """
667 if Counter(calibration_class.tuner_paras.get_names()) != Counter(list(result.columns)):
668 raise NameError("The tuner-parameter of the calibration class do not "
669 "match the tuner-parameters in the sensitivity result."
670 "They have to match.")
672 result = result.loc[:, :, analysis_variable]
673 calc_names_results = result.index.get_level_values("Class").unique()
674 if calc_names_for_selection:
675 for cal_class in calc_names_for_selection:
676 if cal_class not in calc_names_results:
677 raise NameError(f"The calibration class name {cal_class} "
678 f"does not match any class name "
679 f"in the given sensitivity result.")
680 result = result.loc[calc_names_for_selection, :, :]
682 selected_tuners = (result >= threshold).any()
684 remove_tuners = []
685 for tuner, selected in selected_tuners.items():
686 if not selected:
687 remove_tuners.append(tuner)
688 tuner_paras = copy.deepcopy(calibration_class.tuner_paras)
689 tuner_paras.remove_names(remove_tuners)
690 if not tuner_paras.get_names():
691 raise ValueError("Threshold to small. All tuner-parameters would be removed.")
692 calibration_class.tuner_paras = tuner_paras
693 return calibration_class
695 def run_time_dependent(self, cal_class: CalibrationClass, **kwargs):
696 """
697 Calculate the time dependent sensitivity for all the single goals in the calibration class.
699 :param CalibrationClass cal_class:
700 Calibration class with tuner-parameters to calculate sensitivity for.
701 Can include dummy target date.
702 :keyword scale:
703 Default is False. If True the bounds of the tuner-parameters
704 will be scaled between 0 and 1.
705 :keyword bool load_sim_files:
706 Default False. If True, no new simulations are done and old simulations are loaded.
707 The simulations and corresponding samples will be loaded from self.savepath_sim like
708 they were saved from self.save_files. Currently, the name of the sim folder must be
709 "simulations_CAL_CLASS_NAME" and for the samples "samples_CAL_CLASS_NAME".
710 :keyword bool save_results:
711 Default True. If True, all results are saved as a csv in working_directory.
712 (samples and analysis variables).
713 :keyword int n_steps:
714 Default is all time steps. If the problem is large, the evaluation of all time steps
715 at once can cause a memory error. Then n_steps defines how many time_steps
716 are evaluated at once in chunks. This increases the needed time exponentially and
717 the simulation files must be saved.
718 :keyword bool plot_result:
719 Default True. If True, the results will be plotted.
720 :return:
721 Returns a pandas.DataFrame.
722 :rtype: pandas.DataFrame
723 """
724 scale = kwargs.pop('scale', False)
725 save_results = kwargs.pop('save_results', True)
726 plot_result = kwargs.pop('plot_result', True)
727 load_sim_files = kwargs.pop('load_sim_files', False)
728 n_steps = kwargs.pop('n_steps', 'all')
730 self.logger.info("Start time dependent sensitivity analysis.")
731 if load_sim_files:
732 self.problem = self.create_problem(cal_class.tuner_paras, scale=scale)
733 sim_dir = self.savepath_sim.joinpath(f'simulations_{cal_class.name}')
734 samples_path = self.savepath_sim.joinpath(f'samples_{cal_class.name}.csv')
735 samples = pd.read_csv(samples_path,
736 header=0,
737 index_col=0)
738 samples = samples.to_numpy()
739 result_file_names = [f"simulation_{idx}.{self.suffix_files}" for idx in
740 range(len(samples))]
741 _filepaths = [sim_dir.joinpath(result_file_name) for result_file_name in
742 result_file_names]
744 sen_time_dependent_list, time_index = self._load_analyze_tsteps(_filepaths=_filepaths,
745 samples=samples,
746 n_steps=n_steps,
747 cal_class=cal_class)
748 sen_time_dependent_df = _restruct_time_dependent(sen_time_dependent_list, time_index)
749 else:
750 results, samples = self.simulate_samples(
751 cal_class=cal_class,
752 scale=scale
753 )
754 if self.save_files:
755 sen_time_dependent_list, time_index = self._load_analyze_tsteps(_filepaths=results,
756 samples=samples,
757 n_steps=n_steps,
758 cal_class=cal_class)
759 sen_time_dependent_df = _restruct_time_dependent(sen_time_dependent_list,
760 time_index)
761 else:
762 variables = results[0].tsd.get_variable_names()
763 time_index = results[0].index.to_numpy()
764 total_result = _concat_all_sims(results)
765 sen_time_dependent_list = []
766 for time_step in time_index:
767 result_df_tstep = self._analyze_tstep_df(time_step=time_step,
768 tsteps_sim_results=total_result,
769 variables=variables,
770 samples=samples,
771 cal_class=cal_class)
772 sen_time_dependent_list.append(result_df_tstep)
773 sen_time_dependent_df = _restruct_time_dependent(sen_time_dependent_list,
774 time_index)
775 self.logger.info("Finished time dependent sensitivity analysys.")
776 if save_results:
777 self._save(sen_time_dependent_df, time_dependent=True)
778 if plot_result:
779 if isinstance(sen_time_dependent_df, pd.DataFrame):
780 plot_time_dependent(sen_time_dependent_df)
781 else:
782 plot_time_dependent(sen_time_dependent_df[0])
783 return sen_time_dependent_df
785 def _analyze_tstep_df(self, time_step, tsteps_sim_results, variables, samples, cal_class):
786 """Analyze the sensitivity at a single time step."""
787 result_dict_tstep = {}
788 for var in variables:
789 result_tstep_var = tsteps_sim_results[var].loc[time_step].to_numpy()
790 if np.all(result_tstep_var == result_tstep_var[0]):
791 sen_tstep_var = None
792 else:
793 sen_tstep_var = self.analysis_function(
794 x=samples,
795 y=result_tstep_var
796 )
797 result_dict_tstep[var] = sen_tstep_var
798 result_df_tstep = self._conv_local_results(results=[result_dict_tstep],
799 local_classes=[cal_class])
800 return result_df_tstep
802 def _load_tsteps_df(self, tsteps, _filepaths):
803 """
804 Load all simulations and extract and concat the sim results of the time steps in tsteps.
805 """
806 self.logger.info(
807 f"Loading time steps from {tsteps[0]} to {tsteps[-1]} of the simulation files.")
808 tsteps_sim_results = []
809 for _filepath in _filepaths:
810 sim = _load_single_file(_filepath)
811 tsteps_sim_results.append(sim.loc[tsteps[0]:tsteps[-1]])
812 tsteps_sim_results = _concat_all_sims(tsteps_sim_results)
813 return tsteps_sim_results
815 def _load_analyze_tsteps(self, _filepaths, samples, n_steps, cal_class):
816 """
817 Load and analyze all time steps in chunks with n_steps time steps.
818 """
819 sim1 = _load_single_file(_filepaths[0])
820 time_index = sim1.index.to_numpy()
821 variables = sim1.tsd.get_variable_names()
822 sen_time_dependent_list = []
823 if n_steps == 'all':
824 list_tsteps = [time_index]
825 elif isinstance(n_steps, int) and not (n_steps <= 0 or n_steps > len(time_index)):
826 list_tsteps = _divide_chunks(time_index, n_steps)
827 else:
828 raise ValueError(
829 f"n_steps can only be between 1 and {len(time_index)} or the string all.")
831 for tsteps in list_tsteps:
832 tsteps_sim_results = self._load_tsteps_df(tsteps=tsteps, _filepaths=_filepaths)
833 self.logger.info("Analyzing these time steps.")
834 for tstep in tsteps:
835 result_df_tstep = self._analyze_tstep_df(time_step=tstep,
836 tsteps_sim_results=tsteps_sim_results,
837 variables=variables,
838 samples=samples,
839 cal_class=cal_class)
840 sen_time_dependent_list.append(result_df_tstep)
841 return sen_time_dependent_list, time_index
843 def _conv_global_result(self, result: dict, cal_class: CalibrationClass,
844 analysis_variable: str):
845 glo_res_dict = self._get_res_dict(result=result, cal_class=cal_class,
846 analysis_variable=analysis_variable)
847 return pd.DataFrame(glo_res_dict, index=['global'])
849 def _conv_local_results(self, results: list, local_classes: list):
850 """
851 Convert the result dictionaries form SALib of each class and goal into one DataFrame.
852 Overwritten for Sobol.
853 """
854 _conv_results = []
855 tuples = []
856 for class_results, local_class in zip(results, local_classes):
857 for goal, goal_results in class_results.items():
858 for analysis_var in self.analysis_variables:
859 _conv_results.append(self._get_res_dict(result=goal_results,
860 cal_class=local_class,
861 analysis_variable=analysis_var))
862 tuples.append((local_class.name, goal, analysis_var))
863 index = pd.MultiIndex.from_tuples(tuples=tuples,
864 names=['Class', 'Goal', 'Analysis variable'])
865 df = pd.DataFrame(_conv_results, index=index)
866 return df
868 @abc.abstractmethod
869 def _get_res_dict(self, result: dict, cal_class: CalibrationClass, analysis_variable: str):
870 """
871 Convert the result object to a dict with the key
872 being the variable name and the value being the result
873 associated to analysis_variable.
874 """
875 raise NotImplementedError
877 def plot(self, result):
878 """
879 Plot the results of the sensitivity analysis method from run().
881 :param pd.DataFrame result:
882 Dataframe of the results like from the run() function.
883 :return tuple of matplotlib objects (fig, ax):
884 """
885 plot_single(result=result)
887 @staticmethod
888 def load_from_csv(path):
889 """
890 Load sensitivity results which were saved with the run() or run_time_dependent() function.
892 For second order results use the load_second_order_from_csv() function of the SobolAnalyzer.
893 """
894 result = pd.read_csv(path, index_col=[0, 1, 2])
895 return result
897 def save_for_reproduction(self,
898 title: str,
899 path: Path = None,
900 files: list = None,
901 exclude_sim_files: bool = False,
902 remove_saved_files: bool = False,
903 **kwargs):
904 """
905 Save the settings of the SenAnalyzer and SimApi in order to
906 reproduce the simulations and sensitivity analysis method.
907 All saved results will be also saved in the reproduction
908 archive. The simulations can be excluded from saving.
910 :param str title:
911 Title of the study
912 :param Path path:
913 Where to store the .zip file. If not given, self.working_directory is used.
914 :param list files:
915 List of files to save along the standard ones.
916 Examples would be plots, tables etc.
917 :param bool exclude_sim_files:
918 Default False. If True, the simulation files will not be saved in
919 the reproduction archive.
920 :param bool remove_saved_files:
921 Default False. If True, the result and simulation files will be moved
922 instead of just copied.
923 :param dict kwargs:
924 All keyword arguments except title, files, and path of the function
925 `save_reproduction_archive`. Most importantly, `log_message` may be
926 specified to avoid input during execution.
927 """
928 if files is None:
929 files = []
931 for file_path in self.reproduction_files:
932 if exclude_sim_files:
933 if 'simulation' in str(file_path):
934 continue
935 filename = "SenAnalyzer" + \
936 str(file_path).rsplit(self.working_directory.name, maxsplit=1)[-1]
937 files.append(CopyFile(
938 sourcepath=file_path,
939 filename=filename,
940 remove=remove_saved_files
941 ))
943 return self.sim_api.save_for_reproduction(
944 title=title,
945 path=path,
946 files=files,
947 **kwargs
948 )