Source code for aixcalibuha.sensitivity_analysis.sensitivity_analyzer

"""Package containing modules for sensitivity analysis.
The module contains the relevant base-classes."""
import abc
import copy
import os
import pathlib
import multiprocessing as mp
from typing import List
from collections import Counter
import numpy as np
import pandas as pd
from ebcpy.utils import setup_logger
from ebcpy.utils.reproduction import CopyFile
from ebcpy.simulationapi import SimulationAPI
from aixcalibuha import CalibrationClass, data_types
from aixcalibuha import utils
from aixcalibuha.sensitivity_analysis.plotting import plot_single, plot_time_dependent


def _load_single_file(_filepath, parquet_engine='pyarrow'):
    """Helper function"""
    if _filepath is None:
        return None
    return data_types.TimeSeriesData(_filepath, default_tag='sim', key='simulation',
                                     engine=parquet_engine)


def _load_files(_filepaths, parquet_engine='pyarrow'):
    """Helper function"""
    results = []
    for _filepath in _filepaths:
        results.append(_load_single_file(_filepath, parquet_engine=parquet_engine))
    return results


def _restruct_verbose(list_output_verbose):
    """Helper function"""
    output_verbose = {}
    for key, val in list_output_verbose[0].items():
        output_verbose[key] = np.array([])
    for i in list_output_verbose:
        for key, val in i.items():
            output_verbose[key] = np.append(output_verbose[key], np.array([val[1]]))
    return output_verbose


def _concat_all_sims(sim_results_list):
    """Helper function that concat all results in a list to one DataFrame."""
    sim_results_list = [r.to_df() for r in sim_results_list]
    sim_results_list = pd.concat(sim_results_list, keys=range(len(sim_results_list)),
                                 axis='columns')
    sim_results_list = sim_results_list.swaplevel(axis=1).sort_index(axis=1)
    return sim_results_list


def _restruct_time_dependent(sen_time_dependent_list, time_index):
    """Helper function that restructures the time dependent sensitivity results."""

    def _restruct_single(sen_time_dependent_list_s, second_order=False):
        sen_time_dependent_df = pd.concat(sen_time_dependent_list_s, keys=time_index, axis=0)
        sen_time_dependent_df = sen_time_dependent_df.droplevel('Class', axis='index')
        sen_time_dependent_df = sen_time_dependent_df.swaplevel(0, 1)
        sen_time_dependent_df = sen_time_dependent_df.swaplevel(1, 2).sort_index(axis=0)
        if second_order:
            sen_time_dependent_df = sen_time_dependent_df.swaplevel(2, 3).sort_index(axis=0)
            sen_time_dependent_df.index.set_names(
                ['Goal', 'Analysis variable', 'Interaction', 'time'], inplace=True)
        else:
            sen_time_dependent_df.index.set_names(['Goal', 'Analysis variable', 'time'],
                                                  inplace=True)
        return sen_time_dependent_df

    if isinstance(sen_time_dependent_list[0], tuple):
        sen_time_dependent_list1, sen_time_dependent_list2 = zip(*sen_time_dependent_list)
        return _restruct_single(sen_time_dependent_list1), _restruct_single(
            sen_time_dependent_list2, True)
    return _restruct_single(sen_time_dependent_list)


def _divide_chunks(long_list, chunk_length):
    """Helper function that divides all list into multiple list with a specific chunk length."""
    for i in range(0, len(long_list), chunk_length):
        yield long_list[i:i + chunk_length]


[docs]class SenAnalyzer(abc.ABC): """ Class to perform a Sensitivity Analysis. :param SimulationAPI sim_api: Simulation-API used to simulate the samples :param int num_samples: The parameter `N` to the sampler methods of sobol and morris. NOTE: This is not the number of samples produced, but relates to the total number of samples produced in a manner dependent on the sampler method used. See the documentation of the specific method in the SALib for more information. :keyword str,os.path.normpath cd: The path for the current working directory. Logger and results will be stored here. :keyword boolean fail_on_error: Default is False. If True, the calibration will stop with an error if the simulation fails. See also: ``ret_val_on_error`` :keyword float,np.NAN ret_val_on_error: Default is np.NAN. If ``fail_on_error`` is false, you can specify here which value to return in the case of a failed simulation. Possible options are np.NaN, np.inf or some other high numbers. be aware that this max influence the solver. :keyword boolean save_files: Default False. If true, all simulation files for each iteration will be saved! :keyword str suffix_files: Default 'csv'. Specifies the data format to store the simulation files in. Options are 'csv', 'hdf', 'parquet'. :keyword str parquet_engine: The engine to use for the data format parquet. Supported options can be extracted from the ebcpy.TimeSeriesData.save() function. Default is 'pyarrow'. :keyword str,os.path.normpath savepath_sim: Default is cd. Own directory for the time series data sets of all simulations during the sensitivity analysis. The own dir can be necessary for large data sets, because they can crash IDE during indexing when they are in the project folder. """ def __init__(self, sim_api: SimulationAPI, num_samples: int, **kwargs): """Instantiate class parameters""" # Setup the instance attributes self.sim_api = sim_api self.num_samples = num_samples # Update kwargs self.fail_on_error = kwargs.pop("fail_on_error", True) self.save_files = kwargs.pop("save_files", False) self.suffix_files = kwargs.pop('suffix_files', 'csv') self.parquet_engine = kwargs.pop('parquet_engine', 'pyarrow') self.ret_val_on_error = kwargs.pop("ret_val_on_error", np.NAN) self.cd = kwargs.pop("cd", os.getcwd()) self.savepath_sim = kwargs.pop('savepath_sim', self.cd) if isinstance(self.cd, str): self.cd = pathlib.Path(self.cd) if isinstance(self.savepath_sim, str): self.savepath_sim = pathlib.Path(self.savepath_sim) # Setup the logger self.logger = setup_logger(cd=self.cd, name=self.__class__.__name__) # Setup default values self.problem: dict = None self.reproduction_files = [] @property @abc.abstractmethod def analysis_variables(self) -> List[str]: """ Indicate which variables are able to be selected for analysis :return: A list of strings :rtype: List[str] """ raise NotImplementedError(f'{self.__class__.__name__}.analysis_variables ' f'property is not defined yet')
[docs] @abc.abstractmethod def analysis_function(self, x, y): """ Use the method to analyze the simulation results. :param np.array x: the `X` parameter of the method (The NumPy matrix containing the model inputs) :param np.array y: The NumPy array containing the model outputs """ raise NotImplementedError(f'{self.__class__.__name__}.analysis_function ' f'function is not defined yet')
[docs] @abc.abstractmethod def create_sampler_demand(self) -> dict: """ Return the sampler parameters :return: dict: A dict with the sampler demand """ raise NotImplementedError(f'{self.__class__.__name__}.analysis_function ' f'function is not defined yet')
[docs] @abc.abstractmethod def generate_samples(self): """ Run the sampler specified by `method` and return the results. :return: The list of samples generated as a NumPy array with one row per sample and each row containing one value for each variable name in `problem['names']`. :rtype: np.ndarray """ raise NotImplementedError(f'{self.__class__.__name__}.generate_samples ' f'function is not defined yet')
[docs] def simulate_samples(self, cal_class, **kwargs): """ Creates the samples for the calibration class and simulates them. :param cal_class: One class for calibration. Goals and tuner_paras have to be set :keyword scale: Default is False. If True the bounds of the tuner-parameters will be scaled between 0 and 1. :return: Returns two lists. First a list with the simulation results for each sample. If save_files the list contains the filepaths to the results Second a list of the samples. :rtype: list """ scale = kwargs.pop('scale', False) # Set the output interval according the given Goals mean_freq = cal_class.goals.get_meas_frequency() self.logger.info("Setting output_interval of simulation according " "to measurement target data frequency: %s", mean_freq) self.sim_api.sim_setup.output_interval = mean_freq initial_names = cal_class.tuner_paras.get_names() self.sim_api.set_sim_setup({"start_time": cal_class.start_time, "stop_time": cal_class.stop_time}) self.sim_api.result_names = cal_class.goals.get_sim_var_names() self.problem = self.create_problem(cal_class.tuner_paras, scale=scale) samples = self.generate_samples() # creat df of samples with the result_file_names as the index result_file_names = [f"simulation_{idx}" for idx in range(len(samples))] samples_df = pd.DataFrame(samples, columns=initial_names, index=result_file_names) samples_df.to_csv(self.cd.joinpath(f'samples_{cal_class.name}.csv')) # Simulate the current values parameters = [] for initial_values in samples: if scale: initial_values = cal_class.tuner_paras.descale(initial_values) parameters.append(dict(zip(initial_names, initial_values))) self.logger.info('Starting %s parameter variations on %s cores', len(samples), self.sim_api.n_cpu) if self.save_files: sim_dir = self.savepath_sim.joinpath(f'simulations_{cal_class.name}') os.makedirs(sim_dir, exist_ok=True) samples_df.to_csv(self.savepath_sim.joinpath(f'samples_{cal_class.name}.csv')) self.logger.info(f'Saving simulation files in: {sim_dir}') _filepaths = self.sim_api.simulate( parameters=parameters, return_option="savepath", savepath=sim_dir, result_file_name=result_file_names, result_file_suffix=self.suffix_files, parquet_engine=self.parquet_engine, fail_on_error=self.fail_on_error, inputs=cal_class.inputs, **cal_class.input_kwargs ) self.reproduction_files.extend(_filepaths) results = _filepaths else: results = self.sim_api.simulate( parameters=parameters, inputs=cal_class.inputs, fail_on_error=self.fail_on_error, **cal_class.input_kwargs ) self.logger.info('Finished %s simulations', len(samples)) return results, samples
def _check_index(self, tsd: data_types.TimeSeriesData, sim_num=None): freq = tsd.frequency if sim_num is None: sim_num = tsd.filepath.name if freq[0] != self.sim_api.sim_setup.output_interval: self.logger.info( f'The mean value of the frequency from {sim_num} does not match output ' 'interval index will be cleaned and spaced equally') tsd.to_datetime_index() tsd.clean_and_space_equally(f'{str(self.sim_api.sim_setup.output_interval * 1000)}ms') tsd.to_float_index() freq = tsd.frequency if freq[1] > 0.0: self.logger.info(f'The standard deviation of the frequency from {sim_num} is to high ' f'and will be rounded to the accuracy of the output interval') tsd.index = np.round(tsd.index.astype("float64"), str(self.sim_api.sim_setup.output_interval)[::-1].find('.')) return tsd def _single_eval_statistical_measure(self, kwargs_eval): """Evaluates statistical measure of one result""" cal_class = kwargs_eval.pop('cal_class') result = kwargs_eval.pop('result') num_sim = kwargs_eval.pop('sim_num', None) if result is None: verbose_error = {} for goal, weight in zip(cal_class.goals.get_goals_list(), cal_class.goals.weightings): verbose_error[goal] = (weight, self.ret_val_on_error) return self.ret_val_on_error, verbose_error result = self._check_index(result, num_sim) cal_class.goals.set_sim_target_data(result) cal_class.goals.set_relevant_time_intervals(cal_class.relevant_intervals) # Evaluate the current objective total_res, verbose_calculation = cal_class.goals.eval_difference(verbose=True) return total_res, verbose_calculation
[docs] def eval_statistical_measure(self, cal_class, results, verbose=True): """Evaluates statistical measures of results on single core""" self.logger.info('Starting evaluation of statistical measure') output = [] list_output_verbose = [] for i, result in enumerate(results): total_res, verbose_calculation = self._single_eval_statistical_measure( {'cal_class': cal_class, 'result': result, 'sim_num': f'simulation_{i}'} ) output.append(total_res) list_output_verbose.append(verbose_calculation) if verbose: # restructure output_verbose output_verbose = _restruct_verbose(list_output_verbose) return np.asarray(output), output_verbose return np.asarray(output)
def _single_load_eval_file(self, kwargs_load_eval): """For multiprocessing""" filepath = kwargs_load_eval.pop('filepath') _result = _load_single_file(filepath, self.parquet_engine) kwargs_load_eval.update({'result': _result}) total_res, verbose_calculation = self._single_eval_statistical_measure(kwargs_load_eval) return total_res, verbose_calculation def _mp_load_eval(self, _filepaths, cal_class, n_cpu): """ Loading and evaluating the statistical measure of saved simulation files on multiple cores """ self.logger.info(f'Load files and evaluate statistical measure on {n_cpu} processes.') kwargs_load_eval = [] for filepath in _filepaths: kwargs_load_eval.append({'filepath': filepath, 'cal_class': cal_class}) output_array = [] list_output_verbose = [] with mp.Pool(processes=n_cpu) as pool: for total, verbose in pool.imap(self._single_load_eval_file, kwargs_load_eval): output_array.append(total) list_output_verbose.append(verbose) output_array = np.asarray(output_array) output_verbose = _restruct_verbose(list_output_verbose) return output_array, output_verbose def _load_eval(self, _filepaths, cal_class, n_cpu): """ Loading and evaluating the statistical measure of saved simulation files. Single- or multiprocessing possible with definition of n_cpu. """ if n_cpu == 1: results = _load_files(_filepaths, self.parquet_engine) output_array, output_verbose = self.eval_statistical_measure( cal_class=cal_class, results=results ) return output_array, output_verbose output_array, output_verbose = self._mp_load_eval(_filepaths, cal_class, n_cpu) return output_array, output_verbose
[docs] def run(self, calibration_classes, merge_multiple_classes=True, **kwargs): """ Execute the sensitivity analysis for each class and return the result. :param CalibrationClass,list calibration_classes: Either one or multiple classes for calibration with same tuner-parameters. :param bool merge_multiple_classes: Default True. If False, the given list of calibration-classes is handled as-is. This means if you pass two CalibrationClass objects with the same name (e.g. "device on"), the calibration process will run for both these classes stand-alone. This will automatically yield an intersection of tuner-parameters, however may have advantages in some cases. :keyword bool verbose: Default False. If True, in addition to the combined Goals of the Classes (saved under index Goal: all), the sensitivity measures of the individual Goals will also be calculated and returned. :keyword scale: Default is False. If True the bounds of the tuner-parameters will be scaled between 0 and 1. :keyword bool use_fist_sim: Default False. If True, the simulations of the first calibration class will be used for all other calibration classes with their relevant time intervals. The simulations must be stored on a hard-drive, so it must be used with either save_files or load_files. :keyword int n_cpu: Default is 1. The number of processes to use for the evaluation of the statistical measure. For n_cpu > 1 only one simulation file is loaded at once in a process and dumped directly after the evaluation of the statistical measure, so that only minimal memory is used. Use this option for large analyses. Only implemented for save_files=True or load_sim_files=True. :keyword bool load_sim_files: Default False. If True, no new simulations are done and old simulations are loaded. The simulations and corresponding samples will be loaded from self.savepath_sim like they were saved from self.save_files. Currently, the name of the sim folder must be "simulations_CAL_CLASS_NAME" and for the samples "samples_CAL_CLASS_NAME". The usage of the same simulations for different calibration classes is not supported yet. :keyword bool save_results: Default True. If True, all results are saved as a csv in cd. (samples, statistical measures and analysis variables). :keyword bool plot_result: Default True. If True, the results will be plotted. :return: Returns a pandas.DataFrame. The DataFrame has a Multiindex with the levels Class, Goal and Analysis variable. The Goal name of combined goals is 'all'. The variables are the tuner-parameters. For the Sobol Method and calc_second_order returns a tuple of DataFrames (df_1, df_2) where df_2 contains the second oder analysis variables and has an extra index level Interaction, which also contains the variables. :rtype: pandas.DataFrame """ verbose = kwargs.pop('verbose', False) scale = kwargs.pop('scale', False) use_first_sim = kwargs.pop('use_first_sim', False) n_cpu = kwargs.pop('n_cpu', 1) save_results = kwargs.pop('save_results', True) plot_result = kwargs.pop('plot_result', True) load_sim_files = kwargs.pop('load_sim_files', False) # Check correct input calibration_classes = utils.validate_cal_class_input(calibration_classes) # Merge the classes for avoiding possible intersection of tuner-parameters if merge_multiple_classes: calibration_classes = data_types.merge_calibration_classes(calibration_classes) # Check n_cpu if n_cpu > mp.cpu_count(): raise ValueError(f"Given n_cpu '{n_cpu}' is greater " "than the available number of " f"cpus on your machine '{mp.cpu_count()}'") # Check if the usage of the simulations from the first calibration class for all is possible if use_first_sim: if not self.save_files and not load_sim_files: raise AttributeError('To use the simulations of the first calibration class ' 'for all classes the simulation files must be saved. ' 'Either set save_files=True or load already exiting files ' 'with load_sim_files=True.') start_time = 0 stop_time = 0 for idx, cal_class in enumerate(calibration_classes): if idx == 0: start_time = cal_class.start_time stop_time = cal_class.stop_time continue if start_time > cal_class.start_time or stop_time < cal_class.stop_time: raise ValueError(f'To use the simulations of the first calibration class ' f'for all classes the start and stop times of the other ' f'classes must be in the interval [{start_time}, {stop_time}] ' f'of the first calibration class.') all_results = [] for idx, cal_class in enumerate(calibration_classes): self.logger.info('Start sensitivity analysis of class: %s, ' 'Time-Interval: %s-%s s', cal_class.name, cal_class.start_time, cal_class.stop_time) # Generate list with metrics of every parameter variation results_goals = {} if load_sim_files: self.problem = self.create_problem(cal_class.tuner_paras, scale=scale) if use_first_sim: class_name = calibration_classes[0].name else: class_name = cal_class.name sim_dir = self.savepath_sim.joinpath(f'simulations_{class_name}') samples_path = self.savepath_sim.joinpath(f'samples_{class_name}.csv') self.logger.info(f'Loading samples from {samples_path}') samples = pd.read_csv(samples_path, header=0, index_col=0) samples = samples.to_numpy() result_file_names = [f"simulation_{idx}.{self.suffix_files}" for idx in range(len(samples))] _filepaths = [sim_dir.joinpath(result_file_name) for result_file_name in result_file_names] self.logger.info(f'Loading simulation files from {sim_dir}') output_array, output_verbose = self._load_eval(_filepaths, cal_class, n_cpu) else: results, samples = self.simulate_samples( cal_class=cal_class, scale=scale ) if self.save_files: output_array, output_verbose = self._load_eval(results, cal_class, n_cpu) else: output_array, output_verbose = self.eval_statistical_measure( cal_class=cal_class, results=results ) if use_first_sim: load_sim_files = True # combine output_array and output_verbose # set key for output_array depending on one or multiple goals stat_mea = {'all': output_array} if len(output_verbose) == 1: stat_mea = output_verbose if len(output_verbose) > 1 and verbose: stat_mea.update(output_verbose) # save statistical measure and corresponding samples for each cal_class in cd if save_results: result_file_names = [f"simulation_{idx}" for idx in range(len(output_array))] stat_mea_df = pd.DataFrame(stat_mea, index=result_file_names) savepath_stat_mea = self.cd.joinpath( f'{cal_class.goals.statistical_measure}_{cal_class.name}.csv') stat_mea_df.to_csv(savepath_stat_mea) self.reproduction_files.append(savepath_stat_mea) samples_df = pd.DataFrame(samples, columns=cal_class.tuner_paras.get_names(), index=result_file_names) savepath_samples = self.cd.joinpath(f'samples_{cal_class.name}.csv') samples_df.to_csv(savepath_samples) self.reproduction_files.append(savepath_samples) self.logger.info('Starting calculation of analysis variables') for key, val in stat_mea.items(): result_goal = self.analysis_function( x=samples, y=val ) results_goals[key] = result_goal all_results.append(results_goals) self.logger.info('Finished sensitivity analysis of class: %s, ' 'Time-Interval: %s-%s s', cal_class.name, cal_class.start_time, cal_class.stop_time) result = self._conv_local_results(results=all_results, local_classes=calibration_classes) if save_results: self._save(result) if plot_result: self.plot(result) return result, calibration_classes
def _save(self, result: pd.DataFrame, time_dependent: bool = False): """ Saves the result DataFrame of run and run_time_dependent. Needs to be overwritten for Sobol results. """ if time_dependent: savepath_result = self.cd.joinpath(f'{self.__class__.__name__}_results_time.csv') else: savepath_result = self.cd.joinpath(f'{self.__class__.__name__}_results.csv') result.to_csv(savepath_result) self.reproduction_files.append(savepath_result)
[docs] @staticmethod def create_problem(tuner_paras, scale=False) -> dict: """Create function for later access if multiple calibration-classes are used.""" num_vars = len(tuner_paras.get_names()) bounds = np.array(tuner_paras.get_bounds()) if scale: bounds = [np.zeros_like(bounds[0]), np.ones_like(bounds[1])] problem = {'num_vars': num_vars, 'names': tuner_paras.get_names(), 'bounds': np.transpose(bounds)} return problem
[docs] @staticmethod def select_by_threshold(calibration_classes, result, analysis_variable, threshold): """ Automatically select sensitive tuner parameters based on a given threshold of a given analysis variable from a sensitivity result. Uses only the combined goals. :param list calibration_classes: List of aixcalibuha.data_types.CalibrationClass objects that you want to automatically select sensitive tuner-parameters. :param pd.DataFrame result: Result object of sensitivity analysis run :param str analysis_variable: Analysis variable to use for the selection :param float threshold: Minimal required value of given key :return: list calibration_classes """ for cal_class in calibration_classes: first_goal = result.index.get_level_values(1)[0] class_result = result.loc[cal_class.name, first_goal, analysis_variable] tuner_paras = copy.deepcopy(cal_class.tuner_paras) select_names = class_result[class_result < threshold].index.values tuner_paras.remove_names(select_names) if not tuner_paras.get_names(): raise ValueError( 'Automatic selection removed all tuner parameter ' f'from class {cal_class.name} after ' 'SensitivityAnalysis was done. Please adjust the ' 'threshold in json or manually chose tuner ' 'parameters for the calibration.') # cal_class.set_tuner_paras(tuner_paras) cal_class.tuner_paras = tuner_paras return calibration_classes
[docs] @staticmethod def select_by_threshold_verbose(calibration_class: CalibrationClass, result: pd.DataFrame, analysis_variable: str, threshold: float, calc_names_for_selection: List[str] = None): """ Select tuner-parameters of single calibration class with verbose sensitivity results. This function selects tuner-parameters if their sensitivity is equal or greater than the threshold in just one target value of one calibration class in the sensitivity result. This can be more robust because a small sensitivity in one target value and state of the system can mean that the parameter can also be calibrated in a global calibration class which calibrates multiple states and target values at the same time and has there not directly the same sensitivity as in the isolated view of a calibration class for only one state. :param CalibrationClass calibration_class: The calibration class from which the tuner parameters will be selected. :param pd.DataFrame result: Sensitivity results to use for the selection. Can include multiple classes. :param str analysis_variable: The analysis variable to use for the selection. :param float threshold: Minimal required value of given analysis variable. :param List[str] calc_names_for_selection: Specifies which calibration classes in the sensitivity results will be used for the selection. Default are all classes. """ if Counter(calibration_class.tuner_paras.get_names()) != Counter(list(result.columns)): raise NameError("The tuner-parameter of the calibration class do not " "match the tuner-parameters in the sensitivity result." "They have to match.") result = result.loc[:, :, analysis_variable] calc_names_results = result.index.get_level_values("Class").unique() if calc_names_for_selection: for cal_class in calc_names_for_selection: if cal_class not in calc_names_results: raise NameError(f"The calibration class name {cal_class} " f"does not match any class name " f"in the given sensitivity result.") result = result.loc[calc_names_for_selection, :, :] selected_tuners = (result >= threshold).any() remove_tuners = [] for tuner, selected in selected_tuners.items(): if not selected: remove_tuners.append(tuner) tuner_paras = copy.deepcopy(calibration_class.tuner_paras) tuner_paras.remove_names(remove_tuners) if not tuner_paras.get_names(): raise ValueError("Threshold to small. All tuner-parameters would be removed.") calibration_class.tuner_paras = tuner_paras return calibration_class
[docs] def run_time_dependent(self, cal_class: CalibrationClass, **kwargs): """ Calculate the time dependent sensitivity for all the single goals in the calibration class. :param CalibrationClass cal_class: Calibration class with tuner-parameters to calculate sensitivity for. Can include dummy target date. :keyword scale: Default is False. If True the bounds of the tuner-parameters will be scaled between 0 and 1. :keyword bool load_sim_files: Default False. If True, no new simulations are done and old simulations are loaded. The simulations and corresponding samples will be loaded from self.savepath_sim like they were saved from self.save_files. Currently, the name of the sim folder must be "simulations_CAL_CLASS_NAME" and for the samples "samples_CAL_CLASS_NAME". :keyword bool save_results: Default True. If True, all results are saved as a csv in cd. (samples and analysis variables). :keyword int n_steps: Default is all time steps. If the problem is large, the evaluation of all time steps at once can cause a memory error. Then n_steps defines how many time_steps are evaluated at once in chunks. This increases the needed time exponentially and the simulation files must be saved. :keyword bool plot_result: Default True. If True, the results will be plotted. :return: Returns a pandas.DataFrame. :rtype: pandas.DataFrame """ scale = kwargs.pop('scale', False) save_results = kwargs.pop('save_results', True) plot_result = kwargs.pop('plot_result', True) load_sim_files = kwargs.pop('load_sim_files', False) n_steps = kwargs.pop('n_steps', 'all') self.logger.info("Start time dependent sensitivity analysis.") if load_sim_files: self.problem = self.create_problem(cal_class.tuner_paras, scale=scale) sim_dir = self.savepath_sim.joinpath(f'simulations_{cal_class.name}') samples_path = self.savepath_sim.joinpath(f'samples_{cal_class.name}.csv') samples = pd.read_csv(samples_path, header=0, index_col=0) samples = samples.to_numpy() result_file_names = [f"simulation_{idx}.{self.suffix_files}" for idx in range(len(samples))] _filepaths = [sim_dir.joinpath(result_file_name) for result_file_name in result_file_names] sen_time_dependent_list, time_index = self._load_analyze_tsteps(_filepaths=_filepaths, samples=samples, n_steps=n_steps, cal_class=cal_class) sen_time_dependent_df = _restruct_time_dependent(sen_time_dependent_list, time_index) else: results, samples = self.simulate_samples( cal_class=cal_class, scale=scale ) if self.save_files: sen_time_dependent_list, time_index = self._load_analyze_tsteps(_filepaths=results, samples=samples, n_steps=n_steps, cal_class=cal_class) sen_time_dependent_df = _restruct_time_dependent(sen_time_dependent_list, time_index) else: variables = results[0].get_variable_names() time_index = results[0].index.to_numpy() total_result = _concat_all_sims(results) sen_time_dependent_list = [] for time_step in time_index: result_df_tstep = self._analyze_tstep_df(time_step=time_step, tsteps_sim_results=total_result, variables=variables, samples=samples, cal_class=cal_class) sen_time_dependent_list.append(result_df_tstep) sen_time_dependent_df = _restruct_time_dependent(sen_time_dependent_list, time_index) self.logger.info("Finished time dependent sensitivity analysys.") if save_results: self._save(sen_time_dependent_df, time_dependent=True) if plot_result: if isinstance(sen_time_dependent_df, pd.DataFrame): plot_time_dependent(sen_time_dependent_df) else: plot_time_dependent(sen_time_dependent_df[0]) return sen_time_dependent_df
def _analyze_tstep_df(self, time_step, tsteps_sim_results, variables, samples, cal_class): """Analyze the sensitivity at a single time step.""" result_dict_tstep = {} for var in variables: result_tstep_var = tsteps_sim_results[var].loc[time_step].to_numpy() if np.all(result_tstep_var == result_tstep_var[0]): sen_tstep_var = None else: sen_tstep_var = self.analysis_function( x=samples, y=result_tstep_var ) result_dict_tstep[var] = sen_tstep_var result_df_tstep = self._conv_local_results(results=[result_dict_tstep], local_classes=[cal_class]) return result_df_tstep def _load_tsteps_df(self, tsteps, _filepaths): """ Load all simulations and extract and concat the sim results of the time steps in tsteps. """ self.logger.info( f"Loading time steps from {tsteps[0]} to {tsteps[-1]} of the simulation files.") tsteps_sim_results = [] for _filepath in _filepaths: sim = _load_single_file(_filepath) tsteps_sim_results.append(sim.loc[tsteps[0]:tsteps[-1]]) tsteps_sim_results = _concat_all_sims(tsteps_sim_results) return tsteps_sim_results def _load_analyze_tsteps(self, _filepaths, samples, n_steps, cal_class): """ Load and analyze all time steps in chunks with n_steps time steps. """ sim1 = _load_single_file(_filepaths[0]) time_index = sim1.index.to_numpy() variables = sim1.get_variable_names() sen_time_dependent_list = [] if n_steps == 'all': list_tsteps = [time_index] elif isinstance(n_steps, int) and not (n_steps <= 0 or n_steps > len(time_index)): list_tsteps = _divide_chunks(time_index, n_steps) else: raise ValueError( f"n_steps can only be between 1 and {len(time_index)} or the string all.") for tsteps in list_tsteps: tsteps_sim_results = self._load_tsteps_df(tsteps=tsteps, _filepaths=_filepaths) self.logger.info("Analyzing these time steps.") for tstep in tsteps: result_df_tstep = self._analyze_tstep_df(time_step=tstep, tsteps_sim_results=tsteps_sim_results, variables=variables, samples=samples, cal_class=cal_class) sen_time_dependent_list.append(result_df_tstep) return sen_time_dependent_list, time_index def _conv_global_result(self, result: dict, cal_class: CalibrationClass, analysis_variable: str): glo_res_dict = self._get_res_dict(result=result, cal_class=cal_class, analysis_variable=analysis_variable) return pd.DataFrame(glo_res_dict, index=['global']) def _conv_local_results(self, results: list, local_classes: list): """ Convert the result dictionaries form SALib of each class and goal into one DataFrame. Overwritten for Sobol. """ _conv_results = [] tuples = [] for class_results, local_class in zip(results, local_classes): for goal, goal_results in class_results.items(): for analysis_var in self.analysis_variables: _conv_results.append(self._get_res_dict(result=goal_results, cal_class=local_class, analysis_variable=analysis_var)) tuples.append((local_class.name, goal, analysis_var)) index = pd.MultiIndex.from_tuples(tuples=tuples, names=['Class', 'Goal', 'Analysis variable']) df = pd.DataFrame(_conv_results, index=index) return df @abc.abstractmethod def _get_res_dict(self, result: dict, cal_class: CalibrationClass, analysis_variable: str): """ Convert the result object to a dict with the key being the variable name and the value being the result associated to analysis_variable. """ raise NotImplementedError
[docs] def plot(self, result): """ Plot the results of the sensitivity analysis method from run(). :param pd.DataFrame result: Dataframe of the results like from the run() function. :return tuple of matplotlib objects (fig, ax): """ plot_single(result=result)
[docs] @staticmethod def load_from_csv(path): """ Load sensitivity results which were saved with the run() or run_time_dependent() function. For second order results use the load_second_order_from_csv() function of the SobolAnalyzer. """ result = pd.read_csv(path, index_col=[0, 1, 2]) return result
[docs] def save_for_reproduction(self, title: str, path: pathlib.Path = None, files: list = None, exclude_sim_files: bool = False, remove_saved_files: bool = False, **kwargs): """ Save the settings of the SenAnalyzer and SimApi in order to reproduce the simulations and sensitivity analysis method. All saved results will be also saved in the reproduction archive. The simulations can be excluded from saving. :param str title: Title of the study :param pathlib.Path path: Where to store the .zip file. If not given, self.cd is used. :param list files: List of files to save along the standard ones. Examples would be plots, tables etc. :param bool exclude_sim_files: Default False. If True, the simulation files will not be saved in the reproduction archive. :param bool remove_saved_files: Default False. If True, the result and simulation files will be moved instead of just copied. :param dict kwargs: All keyword arguments except title, files, and path of the function `save_reproduction_archive`. Most importantly, `log_message` may be specified to avoid input during execution. """ if files is None: files = [] for file_path in self.reproduction_files: if exclude_sim_files: if 'simulation' in str(file_path): continue filename = "SenAnalyzer" + str(file_path).rsplit(self.cd.name, maxsplit=1)[-1] files.append(CopyFile( sourcepath=file_path, filename=filename, remove=remove_saved_files )) return self.sim_api.save_for_reproduction( title=title, path=path, files=files, **kwargs )