"""Base-module for the whole optimization pacakge.
Used to define Base-Classes such as Optimizer and
Calibrator."""
import os
from pathlib import Path
import warnings
from typing import List, Tuple, Union
from collections import namedtuple
from abc import abstractmethod
import numpy as np
from ebcpy.utils import setup_logger
# pylint: disable=import-outside-toplevel
# pylint: disable=broad-except
[docs]class Optimizer:
"""
Base class for optimization in ebcpy. All classes
performing optimization tasks must inherit from this
class.
The main feature of this class is the common interface
for different available solvers in python. This makes the
testing of different solvers and methods more easy.
For available frameworks/solvers, check the function
self.optimize().
:param str,Path working_directory:
Directory for storing all output of optimization via a logger.
:keyword list bounds:
The boundaries for the optimization variables.
"""
# Used to display number of obj-function-calls
_counter = 0
# Used to access the current parameter set if an optimization-step fails
_current_iterate = np.array([])
# Used to access the best iterate if an optimization step fails
_current_best_iterate = {"Objective": np.inf}
# List storing every objective value for plotting and logging.
# Can be used, but will enlarge runtime
_obj_his = []
def __init__(self, working_directory: Union[Path, str] = None, **kwargs):
"""Instantiate class parameters"""
if working_directory is None and "cd" in kwargs:
warnings.warn("cd was renamed to working_directory in all classes. Use working_directory instead.",
category=DeprecationWarning)
self.working_directory = kwargs["cd"]
elif working_directory is None:
self._working_directory = None
else:
self.working_directory = working_directory
self.logger = setup_logger(working_directory=self.working_directory, name=self.__class__.__name__)
# Set kwargs
self.bounds = kwargs.get("bounds", None)
[docs] @abstractmethod
def obj(self, xk, *args):
"""
Base objective function. Overload this function and create your own
objective function. Make sure that the return value is a scalar.
Furthermore, the parameter vector xk is always a numpy array.
:param np.array xk:
Array with parameters for optimization
:return: float result:
A scalar (float/ 1d) value for the optimization framework.
"""
raise NotImplementedError(f'{self.__class__.__name__}.obj function is not defined')
[docs] @abstractmethod
def mp_obj(self, x, *args):
"""
Objective function for Multiprocessing.
:param np.array x:
Array with parameters for optimization.
Shape of the array is (number_of_evaluations x number_of_variables).
For instance, optimizating 10 variables and evaluating
900 objectives in parallel, the shape would be 900 x 10.
:param int n_cpu:
Number of logical Processors to run optimization on.
"""
raise NotImplementedError(f'{self.__class__.__name__}.obj function is not defined')
@property
def supported_frameworks(self):
"""
List with all frameworks supported by this
wrapper class.
"""
return ["scipy_minimize",
"scipy_differential_evolution",
"dlib_minimize",
"pymoo",
"bayesian_optimization"]
@property
def working_directory(self) -> Path:
"""The current working directory"""
return self._working_directory
@working_directory.setter
def working_directory(self, working_directory: Union[Path, str]):
"""Set current working directory"""
if isinstance(working_directory, str):
working_directory = Path(working_directory)
os.makedirs(working_directory, exist_ok=True)
self._working_directory = working_directory
@property
def cd(self) -> Path:
warnings.warn("cd was renamed to working_directory in all classes. "
"Use working_directory instead instead.",
category=DeprecationWarning)
return self.working_directory
@cd.setter
def cd(self, cd: Union[Path, str]):
warnings.warn("cd was renamed to working_directory in all classes. "
"Use working_directory instead instead.",
category=DeprecationWarning)
self.working_directory = cd
@property
def bounds(self) -> List[Union[Tuple, List]]:
"""The boundaries of the optimization problem."""
return self._bounds
@bounds.setter
def bounds(self, bounds):
"""Set the boundaries to the optimization variables"""
self._bounds = bounds
[docs] def optimize(self, framework, method=None, n_cpu=1, **kwargs):
"""
Perform the optimization based on the given method and framework.
:param str framework:
The framework (python module) you want to use to perform the optimization.
Currently, "scipy_minimize", "dlib_minimize" and "scipy_differential_evolution"
are supported options. To further inform yourself about these frameworks, please see:
- `dlib <http://dlib.net/python/index.html>`_
- `scipy minimize <https://docs.scipy.org/doc/scipy/reference/generated/scipy.optimize.minimize.html>`_
- `scipy differential evolution <https://docs.scipy.org/doc/scipy/reference/generated/scipy.optimize.differential_evolution.html>`_
- 'pymoo' <https://pymoo.org/index.html>
:param str method:
The method you pass depends on the methods available in the framework
you chose when setting up the class. Some frameworks don't require a
method, as only one exists. This is the case for dlib. For any framework
with different methods, you must provide one.
For the scipy.differential_evolution function, method is equal to the
strategy.
For the pymoo function, method is equal to the
algorithm.
:param int n_cpu:
Number of parallel processes used for the evaluation.
Ignored if the framework-method combination does not
support multi-processing.
Keyword arguments:
Depending on the framework an method you use, you can fine-tune the
optimization tool using extra arguments. We refer to the documentation of
each framework for a listing of what parameters are supported and how
to set them.
E.g. For scipy.optimize.minimize one could
add "tol=1e-3" as a kwarg.
:return: res
Optimization result.
"""
# Choose the framework
minimize_func, requires_method = self._choose_framework(framework)
if method is None and requires_method:
raise ValueError(f"{framework} requires a method, but None is "
f"provided. Please choose one.")
# Perform minimization
res = minimize_func(method=method, n_cpu=n_cpu, **kwargs)
return res
def _choose_framework(self, framework):
"""
Function to select the functions for optimization
and for executing said functions.
:param str framework:
String for selection of the relevant function. Supported options are:
- scipy_minimize
- dlib_minimize
- scipy_differential_evolution
- pymoo
"""
if framework.lower() == "scipy_minimize":
return self._scipy_minimize, True
if framework.lower() == "dlib_minimize":
return self._dlib_minimize, False
if framework.lower() == "scipy_differential_evolution":
return self._scipy_differential_evolution, True
if framework.lower() == "pymoo":
return self._pymoo, True
if framework.lower() == "bayesian_optimization":
return self._bayesian_optimization, False
raise TypeError(f"Given framework {framework} is currently not supported.")
def _bayesian_optimization(self, method=None, n_cpu=1, **kwargs):
"""
Possible kwargs for the bayesian_optimization function with default values:
random_state = 42
allow_duplicate_points = True
init_points = 5
n_iter = 25
kind_of_utility_function = "ei"
xi = 0.1
kappa = 2.576
verbose = False
For an explanation of what the parameters do, we refer to the documentation of
the bayesian optimization package:
https://bayesian-optimization.github.io/BayesianOptimization/index.html
Additionally, a pre-built acquisition function instance can be passed via
the ``acquisition_function`` kwarg. If given, it takes precedence over
``kind_of_utility_function``/``xi``/``kappa``.
"""
default_kwargs = self.get_default_config(framework="bayesian_optimization")
default_kwargs.update(kwargs)
if "allow_dublicate_points" in default_kwargs:
warnings.warn(
"'allow_dublicate_points' is a typo and deprecated. "
"Use 'allow_duplicate_points' instead.",
FutureWarning,
stacklevel=2,
)
default_kwargs.setdefault(
"allow_duplicate_points",
default_kwargs.pop("allow_dublicate_points"),
)
try:
from bayes_opt import BayesianOptimization
except ImportError as error:
raise ImportError("Please install bayesian-optimization to use "
"the bayesian_optimization function.") from error
# Figure out which API version we're dealing with. In >=2.0 the old
# UtilityFunction was removed and replaced by the acquisition-function
# classes in bayes_opt.acquisition.
try:
from bayes_opt import acquisition as _bo_acquisition
_new_api = True
except ImportError:
_bo_acquisition = None
_new_api = False
try:
if self.bounds is None:
raise ValueError("For the bayesian optimization approach, you need to specify "
"boundaries. Currently, no bounds are specified.")
pbounds = {f"x{n}": i for n, i in enumerate(self.bounds)}
# Resolve the acquisition function. Users can pass a ready-made instance
# via `acquisition_function` (new-API only); otherwise build it from
# `kind_of_utility_function` + `xi`/`kappa`.
acq_function_instance = default_kwargs.get("acquisition_function", None)
kind = default_kwargs["kind_of_utility_function"].lower()
xi = default_kwargs["xi"]
kappa = default_kwargs["kappa"]
random_state = default_kwargs["random_state"]
if _new_api:
if acq_function_instance is None:
if kind in ("ei", "expected_improvement"):
acq_function_instance = _bo_acquisition.ExpectedImprovement(xi=xi)
elif kind in ("poi", "probability_of_improvement"):
acq_function_instance = _bo_acquisition.ProbabilityOfImprovement(xi=xi)
elif kind in ("ucb", "upper_confidence_bound"):
acq_function_instance = _bo_acquisition.UpperConfidenceBound(kappa=kappa)
else:
raise ValueError(
f"Unknown kind_of_utility_function '{kind}'. "
"Supported: 'ei', 'poi', 'ucb' (or pass a ready-made "
"instance via the 'acquisition_function' kwarg)."
)
optimizer = BayesianOptimization(
f=self._bayesian_opt_obj,
pbounds=pbounds,
acquisition_function=acq_function_instance,
random_state=random_state,
allow_duplicate_points=default_kwargs["allow_duplicate_points"],
verbose=default_kwargs["verbose"],
)
gp = default_kwargs.get("gp", None)
if gp is not None:
optimizer._gp = gp
optimizer.maximize(
init_points=default_kwargs["init_points"],
n_iter=default_kwargs["n_iter"],
)
else:
# Legacy path for bayesian-optimization < 2.0
warnings.warn(
"You are using bayesian-optimization < 2.0. Support for this "
"version is deprecated and will be removed in the future. "
"Please upgrade to bayesian-optimization >= 2.0.",
FutureWarning,
stacklevel=2,
)
from bayes_opt.util import UtilityFunction
optimizer = BayesianOptimization(
f=self._bayesian_opt_obj,
pbounds=pbounds,
random_state=random_state,
allow_duplicate_points=default_kwargs["allow_duplicate_points"],
verbose=default_kwargs["verbose"],
)
gp = default_kwargs.get("gp", None)
if gp is not None:
optimizer._gp = gp
acq_function = UtilityFunction(kind=kind, xi=xi, kappa=kappa)
optimizer.maximize(
init_points=default_kwargs["init_points"],
n_iter=default_kwargs["n_iter"],
acquisition_function=acq_function,
)
res = optimizer.max
x_res = np.array(list(res["params"].values()))
f_res = -res["target"]
res_tuple = namedtuple("res_tuple", "x fun")
res = res_tuple(x=x_res, fun=f_res)
return res
except (KeyboardInterrupt, Exception) as error:
# pylint: disable=inconsistent-return-statements
self._handle_error(error)
def _bayesian_opt_obj(self, **kwargs):
"""
This function is needed as the signature for the Bayesian-optimization
is different than the standard signature. The Bayesian-optimization gives keyword arguments for
every parameter and only maximizes, therefore we will maximize the negative objective function value.
"""
xk = np.array(list(kwargs.values()))
return -self.obj(xk)
def _scipy_minimize(self, method, n_cpu=1, **kwargs):
"""
Possible kwargs for the scipy minimize function with default values:
x0: Required
tol = None
options = None
constraints = {}
jac = None
hess = None
hessp = None
"""
default_kwargs = self.get_default_config(framework="scipy_minimize")
default_kwargs.update(kwargs)
try:
import scipy.optimize as opt
except ImportError as error:
raise ImportError("Please install scipy to use "
"the minimize_scipy function.") from error
try:
if "x0" not in kwargs:
raise KeyError("An initial guess (x0) is required "
"for scipy.minimize. You passed None")
res = opt.minimize(
fun=self.obj,
x0=kwargs["x0"],
method=method,
jac=default_kwargs["jac"],
hess=default_kwargs["hess"],
hessp=default_kwargs["hessp"],
bounds=self.bounds,
constraints=default_kwargs["constraints"],
tol=default_kwargs["tol"],
options=default_kwargs["options"]
)
return res
except (KeyboardInterrupt, Exception) as error:
# pylint: disable=inconsistent-return-statements
self._handle_error(error)
def _dlib_minimize(self, method=None, n_cpu=1, **kwargs):
"""
Possible kwargs for the dlib minimize function with default values:
is_integer_variable = None
solver_epsilon = 0
num_function_calls = int(1e9)
"""
default_kwargs = self.get_default_config(framework="dlib_minimize")
default_kwargs.update(kwargs)
try:
import dlib
except ImportError as error:
raise ImportError("Please install dlib to use the minimize_dlib function.") from error
try:
_bounds_2d = np.array(self.bounds)
_bound_min = list(_bounds_2d[:, 0])
_bound_max = list(_bounds_2d[:, 1])
if "is_integer_variable" not in kwargs:
is_integer_variable = list(np.zeros(len(_bound_max)))
else:
is_integer_variable = kwargs["is_integer_variable"]
# This check is only necessary as the error-messages from dlib are quite indirect.
# Any new user would not get that these parameters cause the error.
for key in ["solver_epsilon", "num_function_calls"]:
value = kwargs.get(key)
if value is not None:
if not isinstance(value, (float, int)):
raise TypeError(
f"Given {key} is of type {type(value).__name__} but "
f"should be type float or int"
)
x_res, f_res = dlib.find_min_global(
f=self._dlib_obj,
bound1=_bound_min,
bound2=_bound_max,
is_integer_variable=is_integer_variable,
num_function_calls=int(default_kwargs["num_function_calls"]),
solver_epsilon=float(default_kwargs["solver_epsilon"])
)
res_tuple = namedtuple("res_tuple", "x fun")
res = res_tuple(x=x_res, fun=f_res)
return res
except (KeyboardInterrupt, Exception) as error:
# pylint: disable=inconsistent-return-statements
self._handle_error(error)
def _scipy_differential_evolution(self, method="best1bin", n_cpu=1, **kwargs):
"""
Possible kwargs for the dlib minimize function with default values:
maxiter = 1000
popsize = 15
tol = None
mutation = (0.5, 1)
recombination = 0.7
seed = None
polish = True
init = 'latinhypercube'
atol = 0
"""
default_kwargs = self.get_default_config(framework="scipy_differential_evolution")
default_kwargs.update(kwargs)
try:
import scipy.optimize as opt
except ImportError as error:
raise ImportError("Please install scipy to use the minimize_scipy function.") from error
try:
if self.bounds is None:
raise ValueError("For the differential evolution approach, you need to specify "
"boundaries. Currently, no bounds are specified.")
res = opt.differential_evolution(
func=self.obj,
bounds=self.bounds,
strategy=method,
maxiter=default_kwargs["maxiter"],
popsize=default_kwargs["popsize"],
tol=default_kwargs["tol"],
mutation=default_kwargs["mutation"],
recombination=default_kwargs["recombination"],
seed=default_kwargs["seed"],
disp=False, # We have our own logging
polish=default_kwargs["polish"],
init=default_kwargs["init"],
atol=default_kwargs["atol"]
)
return res
except (KeyboardInterrupt, Exception) as error:
# pylint: disable=inconsistent-return-statements
self._handle_error(error)
def _pymoo(self, method="GA", n_cpu=1, **kwargs):
"""
Possible kwargs for the dlib minimize function with default values:
algorithm=NGSA2
termination=None
seed=None
verbose=False
display=None
callback=None
save_history=False
copy_algorithm=False
copy_termination=False
"""
default_kwargs = self.get_default_config(framework="pymoo")
default_kwargs.update(kwargs)
try:
from pymoo.optimize import minimize
from pymoo.problems.single import Problem
from pymoo.algorithms.moo.ctaea import CTAEA
from pymoo.algorithms.moo.moead import MOEAD
from pymoo.algorithms.moo.nsga2 import NSGA2
from pymoo.algorithms.moo.nsga3 import NSGA3
from pymoo.algorithms.moo.rnsga2 import RNSGA2
from pymoo.algorithms.moo.rnsga3 import RNSGA3
from pymoo.algorithms.soo.nonconvex.de import DE
from pymoo.algorithms.soo.nonconvex.ga import GA
from pymoo.algorithms.moo.unsga3 import UNSGA3
from pymoo.algorithms.soo.nonconvex.brkga import BRKGA
from pymoo.algorithms.soo.nonconvex.pso import PSO
except ImportError as error:
raise ImportError("Please install pymoo to use this function.") from error
pymoo_version_greater_050 = True
try:
from pymoo.factory import get_sampling, get_mutation, get_crossover, get_selection
from pymoo.algorithms.soo.nonconvex.nelder_mead import NelderMead
from pymoo.algorithms.soo.nonconvex.pattern_search import PatternSearch
pymoo_version_greater_050 = False
except ImportError as error:
from pymoo.algorithms.soo.nonconvex.nelder import NelderMead
from pymoo.algorithms.soo.nonconvex.pattern import PatternSearch
pymoo_algorithms = {
"ga": GA,
"brkga": BRKGA,
"de": DE,
"nelder-mead": NelderMead,
"pattern-search": PatternSearch,
"pso": PSO,
"nsga2": NSGA2,
"rnsga2": RNSGA2,
"nsga3": NSGA3,
"unsga3": UNSGA3,
"rnsga3": RNSGA3,
"moead": MOEAD,
"ctaea": CTAEA,
}
if method.lower() not in pymoo_algorithms:
raise ValueError(f"Given method {method} is currently not supported. Please choose one of the "
"following: " + ", ".join(pymoo_algorithms.keys()))
class EBCPYProblem(Problem):
"""Construct wrapper problem class."""
def __init__(self,
ebcpy_class: Optimizer
):
self.ebcpy_class = ebcpy_class
super().__init__(n_var=len(ebcpy_class.bounds),
n_obj=1,
n_constr=0,
xl=np.array([bound[0] for bound in ebcpy_class.bounds]),
xu=np.array([bound[1] for bound in ebcpy_class.bounds])
)
def _evaluate(self, x, out, *args, **kwargs):
if n_cpu > 1:
out["F"] = self.ebcpy_class.mp_obj(x, n_cpu, *args)
else:
out["F"] = np.array([self.ebcpy_class.obj(xk=_x, *args) for _x in x])
try:
if self.bounds is None:
raise ValueError("For pymoo, you need to specify "
"boundaries. Currently, no bounds are specified.")
termination = default_kwargs.pop("termination")
if termination is None:
termination = ("n_gen", default_kwargs.pop("n_gen"))
seed = default_kwargs.pop("seed")
verbose = default_kwargs.pop("verbose")
save_history = default_kwargs.pop("save_history")
copy_algorithm = default_kwargs.pop("copy_algorithm")
copy_termination = default_kwargs.pop("copy_termination")
callback = default_kwargs.pop("callback")
display = default_kwargs.pop("display")
if not pymoo_version_greater_050:
keys_to_check = ["selection", "crossover", "sampling", "mutation"]
if any(isinstance(default_kwargs.get(k), str) for k in keys_to_check):
warnings.warn(
"Support for pymoo<0.6 string arguments is deprecated and will be removed in the future. "
"Please import the classes yourself and pass the objects directly to the kwargs.",
FutureWarning,
stacklevel=2,
)
if "selection" in default_kwargs.keys():
default_kwargs["selection"] = get_selection(name=default_kwargs["selection"])
if "crossover" in default_kwargs.keys():
default_kwargs["crossover"] = get_crossover(name=default_kwargs["crossover"])
if "sampling" in default_kwargs.keys():
default_kwargs["sampling"] = get_sampling(name=default_kwargs["sampling"])
if "mutation" in default_kwargs.keys():
default_kwargs["mutation"] = get_mutation(name=default_kwargs["mutation"])
algorithm = pymoo_algorithms[method.lower()](**default_kwargs)
minimize_kwargs = {
"problem": EBCPYProblem(ebcpy_class=self),
"algorithm": algorithm,
"termination": termination,
"seed": seed,
"verbose": verbose,
"save_history": save_history,
"copy_algorithm": copy_algorithm,
"copy_termination": copy_termination,
}
if callback is not None:
minimize_kwargs["callback"] = callback
if display is not None:
minimize_kwargs["display"] = display
res = minimize(**minimize_kwargs)
res_tuple = namedtuple("res_tuple", "x fun")
res = res_tuple(x=res.X, fun=res.F[0])
return res
except (KeyboardInterrupt, Exception) as error:
# pylint: disable=inconsistent-return-statements
self._handle_error(error)
def _dlib_obj(self, *args):
"""
This function is needed as the signature for the dlib-obj
is different than the standard signature. dlib will parse a number of
parameters
"""
return self.obj(np.array(args))
def _handle_error(self, error):
"""
Function to handle the case when an optimization step fails (e.g. simulation-fail).
The parameter set which caused the failure and the best iterate until this point
are of interest for the user in such case.
:param error:
Any Exception that may occur
"""
self.logger.error(f"Parameter set which caused the failure: {self._current_iterate}")
self.logger.error("Current best objective and parameter set:")
self.logger.error("\n".join([f"{key}: {value}"
for key, value in self._current_best_iterate.items()]))
raise error
[docs] @staticmethod
def get_default_config(framework: str) -> dict:
"""
Return the default config or kwargs for the
given framework.
The default values are extracted of the corresponding
framework directly.
"""
if framework.lower() == "scipy_minimize":
return {"tol": None,
"options": None,
"constraints": None,
"jac": None,
"hess": None,
"hessp": None}
if framework.lower() == "dlib_minimize":
return {"num_function_calls": int(1e9),
"solver_epsilon": 0}
if framework.lower() == "scipy_differential_evolution":
return {"maxiter": 1000,
"popsize": 15,
"tol": 0.01,
"mutation": (0.5, 1),
"recombination": 0.7,
"seed": None,
"polish": True,
"init": 'latinhypercube',
"atol": 0
}
if framework.lower() == "pymoo":
return {"n_gen": 1000,
"termination": None,
"seed": 1,
"verbose": False,
"display": None,
"callback": None,
"save_history": False,
"copy_algorithm": False,
"copy_termination": False
}
if framework.lower() == "bayesian_optimization":
return {"random_state": 42,
"allow_duplicate_points": True,
"init_points": 5,
"n_iter": 25,
"kind_of_utility_function": "ei",
"xi": 0.1,
"kappa": 2.576,
"verbose": False,
}
return {}