Coverage for ebcpy/optimization.py: 86%
219 statements
« prev ^ index » next coverage.py v7.4.4, created at 2024-09-19 12:21 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2024-09-19 12:21 +0000
1"""Base-module for the whole optimization pacakge.
2Used to define Base-Classes such as Optimizer and
3Calibrator."""
5import os
6from pathlib import Path
7import warnings
8from typing import List, Tuple, Union
9from collections import namedtuple
10from abc import abstractmethod
11import numpy as np
12from ebcpy.utils import setup_logger
13# pylint: disable=import-outside-toplevel
14# pylint: disable=broad-except
17class Optimizer:
18 """
19 Base class for optimization in ebcpy. All classes
20 performing optimization tasks must inherit from this
21 class.
22 The main feature of this class is the common interface
23 for different available solvers in python. This makes the
24 testing of different solvers and methods more easy.
25 For available frameworks/solvers, check the function
26 self.optimize().
29 :param str,Path working_directory:
30 Directory for storing all output of optimization via a logger.
31 :keyword list bounds:
32 The boundaries for the optimization variables.
33 """
35 # Used to display number of obj-function-calls
36 _counter = 0
37 # Used to access the current parameter set if an optimization-step fails
38 _current_iterate = np.array([])
39 # Used to access the best iterate if an optimization step fails
40 _current_best_iterate = {"Objective": np.inf}
41 # List storing every objective value for plotting and logging.
42 # Can be used, but will enlarge runtime
43 _obj_his = []
45 def __init__(self, working_directory: Union[Path, str] = None, **kwargs):
46 """Instantiate class parameters"""
47 if working_directory is None and "cd" in kwargs:
48 warnings.warn("cd was renamed to working_directory in all classes. Use working_directory instead.", category=DeprecationWarning)
49 self.working_directory = kwargs["cd"]
50 elif working_directory is None:
51 self._working_directory = None
52 else:
53 self.working_directory = working_directory
55 self.logger = setup_logger(working_directory=self.working_directory, name=self.__class__.__name__)
56 # Set kwargs
57 self.bounds = kwargs.get("bounds", None)
59 @abstractmethod
60 def obj(self, xk, *args):
61 """
62 Base objective function. Overload this function and create your own
63 objective function. Make sure that the return value is a scalar.
64 Furthermore, the parameter vector xk is always a numpy array.
66 :param np.array xk:
67 Array with parameters for optimization
69 :return: float result:
70 A scalar (float/ 1d) value for the optimization framework.
71 """
72 raise NotImplementedError(f'{self.__class__.__name__}.obj function is not defined')
74 @abstractmethod
75 def mp_obj(self, x, *args):
76 """
77 Objective function for Multiprocessing.
79 :param np.array x:
80 Array with parameters for optimization.
81 Shape of the array is (number_of_evaluations x number_of_variables).
82 For instance, optimizating 10 variables and evaluating
83 900 objectives in parallel, the shape would be 900 x 10.
84 :param int n_cpu:
85 Number of logical Processors to run optimization on.
86 """
87 raise NotImplementedError(f'{self.__class__.__name__}.obj function is not defined')
89 @property
90 def supported_frameworks(self):
91 """
92 List with all frameworks supported by this
93 wrapper class.
94 """
95 return ["scipy_minimize",
96 "scipy_differential_evolution",
97 "dlib_minimize",
98 "pymoo",
99 "bayesian_optimization"]
101 @property
102 def working_directory(self) -> Path:
103 """The current working directory"""
104 return self._working_directory
106 @working_directory.setter
107 def working_directory(self, working_directory: Union[Path, str]):
108 """Set current working directory"""
109 if isinstance(working_directory, str):
110 working_directory = Path(working_directory)
111 os.makedirs(working_directory, exist_ok=True)
112 self._working_directory = working_directory
114 @property
115 def cd(self) -> Path:
116 warnings.warn("cd was renamed to working_directory in all classes. Use working_directory instead instead.", category=DeprecationWarning)
117 return self.working_directory
119 @cd.setter
120 def cd(self, cd: Union[Path, str]):
121 warnings.warn("cd was renamed to working_directory in all classes. Use working_directory instead instead.", category=DeprecationWarning)
122 self.working_directory = cd
124 @property
125 def bounds(self) -> List[Union[Tuple, List]]:
126 """The boundaries of the optimization problem."""
127 return self._bounds
129 @bounds.setter
130 def bounds(self, bounds):
131 """Set the boundaries to the optimization variables"""
132 self._bounds = bounds
134 def optimize(self, framework, method=None, n_cpu=1, **kwargs):
135 """
136 Perform the optimization based on the given method and framework.
138 :param str framework:
139 The framework (python module) you want to use to perform the optimization.
140 Currently, "scipy_minimize", "dlib_minimize" and "scipy_differential_evolution"
141 are supported options. To further inform yourself about these frameworks, please see:
142 - `dlib <http://dlib.net/python/index.html>`_
143 - `scipy minimize <https://docs.scipy.org/doc/scipy/reference/generated/scipy.optimize.minimize.html>`_
144 - `scipy differential evolution <https://docs.scipy.org/doc/scipy/reference/generated/scipy.optimize.differential_evolution.html>`_
145 - 'pymoo' <https://pymoo.org/index.html>
146 :param str method:
147 The method you pass depends on the methods available in the framework
148 you chose when setting up the class. Some frameworks don't require a
149 method, as only one exists. This is the case for dlib. For any framework
150 with different methods, you must provide one.
151 For the scipy.differential_evolution function, method is equal to the
152 strategy.
153 For the pymoo function, method is equal to the
154 algorithm.
155 :param int n_cpu:
156 Number of parallel processes used for the evaluation.
157 Ignored if the framework-method combination does not
158 support multi-processing.
160 Keyword arguments:
161 Depending on the framework an method you use, you can fine-tune the
162 optimization tool using extra arguments. We refer to the documentation of
163 each framework for a listing of what parameters are supported and how
164 to set them.
165 E.g. For scipy.optimize.minimize one could
166 add "tol=1e-3" as a kwarg.
168 :return: res
169 Optimization result.
170 """
171 # Choose the framework
172 minimize_func, requires_method = self._choose_framework(framework)
173 if method is None and requires_method:
174 raise ValueError(f"{framework} requires a method, but None is "
175 f"provided. Please choose one.")
176 # Perform minimization
177 res = minimize_func(method=method, n_cpu=n_cpu, **kwargs)
178 return res
180 def _choose_framework(self, framework):
181 """
182 Function to select the functions for optimization
183 and for executing said functions.
185 :param str framework:
186 String for selection of the relevant function. Supported options are:
187 - scipy_minimize
188 - dlib_minimize
189 - scipy_differential_evolution
190 - pymoo
191 """
192 if framework.lower() == "scipy_minimize":
193 return self._scipy_minimize, True
194 if framework.lower() == "dlib_minimize":
195 return self._dlib_minimize, False
196 if framework.lower() == "scipy_differential_evolution":
197 return self._scipy_differential_evolution, True
198 if framework.lower() == "pymoo":
199 return self._pymoo, True
200 if framework.lower() == "bayesian_optimization":
201 return self._bayesian_optimization, False
203 raise TypeError(f"Given framework {framework} is currently not supported.")
205 def _bayesian_optimization(self, method=None, n_cpu=1, **kwargs):
206 """
207 Possible kwargs for the bayesian_optimization function with default values:
209 random_state = 42
210 allow_dublicate_points = True
211 init_points = 100
212 n_iter = 100
213 kind_of_utility_function = "ei"
214 xi = 0.1
216 For an explanation of what the parameters do, we refer to the documentation of
217 the bayesian optimization package:
218 https://bayesian-optimization.github.io/BayesianOptimization/index.html
219 """
220 default_kwargs = self.get_default_config(framework="bayesian_optimization")
221 default_kwargs.update(kwargs)
223 try:
224 from bayes_opt import BayesianOptimization
225 from bayes_opt.util import UtilityFunction
226 except ImportError as error:
227 raise ImportError("Please install bayesian-optimization to use "
228 "the bayesian_optimization function.") from error
230 try:
231 if self.bounds is None:
232 raise ValueError("For the bayesian optimization approach, you need to specify "
233 "boundaries. Currently, no bounds are specified.")
235 pbounds = {f"x{n}": i for n, i in enumerate(self.bounds)}
237 optimizer = BayesianOptimization(
238 f=self._bayesian_opt_obj,
239 pbounds=pbounds,
240 random_state=default_kwargs["random_state"],
241 allow_duplicate_points=default_kwargs["allow_dublicate_points"],
242 verbose=default_kwargs["verbose"]
243 )
245 gp = default_kwargs.get("gp", None)
246 if gp is not None:
247 optimizer._gp = gp
249 acq_function = UtilityFunction(
250 kind=default_kwargs["kind_of_utility_function"],
251 xi=default_kwargs["xi"])
253 optimizer.maximize(
254 init_points=default_kwargs["init_points"],
255 n_iter=default_kwargs["n_iter"],
256 acquisition_function=acq_function
257 )
259 res = optimizer.max
260 x_res = np.array(list(res["params"].values()))
261 f_res = -res["target"]
262 res_tuple = namedtuple("res_tuple", "x fun")
263 res = res_tuple(x=x_res, fun=f_res)
264 return res
265 except (KeyboardInterrupt, Exception) as error:
266 # pylint: disable=inconsistent-return-statements
267 self._handle_error(error)
269 def _bayesian_opt_obj(self, **kwargs):
270 """
271 This function is needed as the signature for the Bayesian-optimization
272 is different than the standard signature. The Bayesian-optimization gives keyword arguments for
273 every parameter and only maximizes, therefore we will maximize the negative objective function value.
274 """
275 xk = np.array(list(kwargs.values()))
276 return -self.obj(xk)
279 def _scipy_minimize(self, method, n_cpu=1, **kwargs):
280 """
281 Possible kwargs for the scipy minimize function with default values:
283 x0: Required
284 tol = None
285 options = None
286 constraints = {}
287 jac = None
288 hess = None
289 hessp = None
290 """
291 default_kwargs = self.get_default_config(framework="scipy_minimize")
292 default_kwargs.update(kwargs)
293 try:
294 import scipy.optimize as opt
295 except ImportError as error:
296 raise ImportError("Please install scipy to use "
297 "the minimize_scipy function.") from error
299 try:
300 if "x0" not in kwargs:
301 raise KeyError("An initial guess (x0) is required "
302 "for scipy.minimize. You passed None")
303 res = opt.minimize(
304 fun=self.obj,
305 x0=kwargs["x0"],
306 method=method,
307 jac=default_kwargs["jac"],
308 hess=default_kwargs["hess"],
309 hessp=default_kwargs["hessp"],
310 bounds=self.bounds,
311 constraints=default_kwargs["constraints"],
312 tol=default_kwargs["tol"],
313 options=default_kwargs["options"]
314 )
315 return res
316 except (KeyboardInterrupt, Exception) as error:
317 # pylint: disable=inconsistent-return-statements
318 self._handle_error(error)
320 def _dlib_minimize(self, method=None, n_cpu=1, **kwargs):
321 """
322 Possible kwargs for the dlib minimize function with default values:
324 is_integer_variable = None
325 solver_epsilon = 0
326 num_function_calls = int(1e9)
327 """
328 default_kwargs = self.get_default_config(framework="dlib_minimize")
329 default_kwargs.update(kwargs)
330 try:
331 import dlib
332 except ImportError as error:
333 raise ImportError("Please install dlib to use the minimize_dlib function.") from error
334 try:
335 _bounds_2d = np.array(self.bounds)
336 _bound_min = list(_bounds_2d[:, 0])
337 _bound_max = list(_bounds_2d[:, 1])
338 if "is_integer_variable" not in kwargs:
339 is_integer_variable = list(np.zeros(len(_bound_max)))
340 else:
341 is_integer_variable = kwargs["is_integer_variable"]
343 # This check is only necessary as the error-messages from dlib are quite indirect.
344 # Any new user would not get that these parameters cause the error.
345 for key in ["solver_epsilon", "num_function_calls"]:
346 value = kwargs.get(key)
347 if value is not None:
348 if not isinstance(value, (float, int)):
349 raise TypeError(
350 f"Given {key} is of type {type(value).__name__} but "
351 f"should be type float or int"
352 )
354 x_res, f_res = dlib.find_min_global(
355 f=self._dlib_obj,
356 bound1=_bound_min,
357 bound2=_bound_max,
358 is_integer_variable=is_integer_variable,
359 num_function_calls=int(default_kwargs["num_function_calls"]),
360 solver_epsilon=float(default_kwargs["solver_epsilon"])
361 )
362 res_tuple = namedtuple("res_tuple", "x fun")
363 res = res_tuple(x=x_res, fun=f_res)
364 return res
365 except (KeyboardInterrupt, Exception) as error:
366 # pylint: disable=inconsistent-return-statements
367 self._handle_error(error)
369 def _scipy_differential_evolution(self, method="best1bin", n_cpu=1, **kwargs):
370 """
371 Possible kwargs for the dlib minimize function with default values:
373 maxiter = 1000
374 popsize = 15
375 tol = None
376 mutation = (0.5, 1)
377 recombination = 0.7
378 seed = None
379 polish = True
380 init = 'latinhypercube'
381 atol = 0
382 """
383 default_kwargs = self.get_default_config(framework="scipy_differential_evolution")
384 default_kwargs.update(kwargs)
385 try:
386 import scipy.optimize as opt
387 except ImportError as error:
388 raise ImportError("Please install scipy to use the minimize_scipy function.") from error
390 try:
391 if self.bounds is None:
392 raise ValueError("For the differential evolution approach, you need to specify "
393 "boundaries. Currently, no bounds are specified.")
395 res = opt.differential_evolution(
396 func=self.obj,
397 bounds=self.bounds,
398 strategy=method,
399 maxiter=default_kwargs["maxiter"],
400 popsize=default_kwargs["popsize"],
401 tol=default_kwargs["tol"],
402 mutation=default_kwargs["mutation"],
403 recombination=default_kwargs["recombination"],
404 seed=default_kwargs["seed"],
405 disp=False, # We have our own logging
406 polish=default_kwargs["polish"],
407 init=default_kwargs["init"],
408 atol=default_kwargs["atol"]
409 )
410 return res
411 except (KeyboardInterrupt, Exception) as error:
412 # pylint: disable=inconsistent-return-statements
413 self._handle_error(error)
415 def _pymoo(self, method="GA", n_cpu=1, **kwargs):
416 """
417 Possible kwargs for the dlib minimize function with default values:
419 algorithm=NGSA2
420 termination=None
421 seed=None
422 verbose=False
423 display=None
424 callback=None
425 save_history=False
426 copy_algorithm=False
427 copy_termination=False
428 """
429 default_kwargs = self.get_default_config(framework="pymoo")
430 default_kwargs.update(kwargs)
432 try:
433 from pymoo.optimize import minimize
434 from pymoo.problems.single import Problem
435 from pymoo.factory import get_sampling, get_mutation, get_crossover, get_selection
436 from pymoo.algorithms.moo.ctaea import CTAEA
437 from pymoo.algorithms.moo.moead import MOEAD
438 from pymoo.algorithms.moo.nsga2 import NSGA2
439 from pymoo.algorithms.moo.nsga3 import NSGA3
440 from pymoo.algorithms.moo.rnsga2 import RNSGA2
441 from pymoo.algorithms.moo.rnsga3 import RNSGA3
442 from pymoo.algorithms.soo.nonconvex.de import DE
443 from pymoo.algorithms.soo.nonconvex.ga import GA
444 from pymoo.algorithms.moo.unsga3 import UNSGA3
445 from pymoo.algorithms.soo.nonconvex.nelder_mead import NelderMead
446 from pymoo.algorithms.soo.nonconvex.brkga import BRKGA
447 from pymoo.algorithms.soo.nonconvex.pattern_search import PatternSearch
448 from pymoo.algorithms.soo.nonconvex.pso import PSO
450 except ImportError as error:
451 raise ImportError("Please install pymoo to use this function.") from error
454 pymoo_algorithms = {
455 "ga": GA,
456 "brkga": BRKGA,
457 "de": DE,
458 "nelder-mead": NelderMead,
459 "pattern-search": PatternSearch,
460 "pso": PSO,
461 "nsga2": NSGA2,
462 "rnsga2": RNSGA2,
463 "nsga3": NSGA3,
464 "unsga3": UNSGA3,
465 "rnsga3": RNSGA3,
466 "moead": MOEAD,
467 "ctaea": CTAEA,
468 }
470 if method.lower() not in pymoo_algorithms:
471 raise ValueError(f"Given method {method} is currently not supported. Please choose one of the "
472 "following: " + ", ".join(pymoo_algorithms.keys()))
474 class EBCPYProblem(Problem):
475 """Construct wrapper problem class."""
476 def __init__(self,
477 ebcpy_class: Optimizer
478 ):
479 self.ebcpy_class = ebcpy_class
480 super().__init__(n_var=len(ebcpy_class.bounds),
481 n_obj=1,
482 n_constr=0,
483 xl=np.array([bound[0] for bound in ebcpy_class.bounds]),
484 xu=np.array([bound[1] for bound in ebcpy_class.bounds])
485 )
487 def _evaluate(self, x, out, *args, **kwargs):
488 if n_cpu > 1:
489 out["F"] = self.ebcpy_class.mp_obj(x, n_cpu, *args)
490 else:
491 out["F"] = np.array([self.ebcpy_class.obj(xk=_x, *args) for _x in x])
493 try:
494 if self.bounds is None:
495 raise ValueError("For pymoo, you need to specify "
496 "boundaries. Currently, no bounds are specified.")
498 termination = default_kwargs.pop("termination")
499 if termination is None:
500 termination = ("n_gen", default_kwargs.pop("n_gen"))
501 seed = default_kwargs.pop("seed")
502 verbose = default_kwargs.pop("verbose")
503 save_history = default_kwargs.pop("save_history")
504 copy_algorithm = default_kwargs.pop("copy_algorithm")
505 copy_termination = default_kwargs.pop("copy_termination")
506 callback = default_kwargs.pop("callback")
507 display = default_kwargs.pop("display")
509 default_kwargs["sampling"] = get_sampling(name=default_kwargs["sampling"])
510 default_kwargs["selection"] = get_selection(name=default_kwargs["selection"])
511 default_kwargs["crossover"] = get_crossover(name=default_kwargs["crossover"])
512 default_kwargs["mutation"] = get_mutation(name=default_kwargs["mutation"])
514 algorithm = pymoo_algorithms[method.lower()](**default_kwargs)
516 res = minimize(
517 problem=EBCPYProblem(ebcpy_class=self),
518 algorithm=algorithm,
519 termination=termination,
520 seed=seed,
521 verbose=verbose,
522 display=display,
523 callback=callback,
524 save_history=save_history,
525 copy_algorithm=copy_algorithm,
526 copy_termination=copy_termination,
527 )
528 res_tuple = namedtuple("res_tuple", "x fun")
529 res = res_tuple(x=res.X, fun=res.F[0])
530 return res
531 except (KeyboardInterrupt, Exception) as error:
532 # pylint: disable=inconsistent-return-statements
533 self._handle_error(error)
535 def _dlib_obj(self, *args):
536 """
537 This function is needed as the signature for the dlib-obj
538 is different than the standard signature. dlib will parse a number of
539 parameters
540 """
541 return self.obj(np.array(args))
543 def _handle_error(self, error):
544 """
545 Function to handle the case when an optimization step fails (e.g. simulation-fail).
546 The parameter set which caused the failure and the best iterate until this point
547 are of interest for the user in such case.
548 :param error:
549 Any Exception that may occur
550 """
551 self.logger.error(f"Parameter set which caused the failure: {self._current_iterate}")
552 self.logger.error("Current best objective and parameter set:")
553 self.logger.error("\n".join([f"{key}: {value}"
554 for key, value in self._current_best_iterate.items()]))
555 raise error
557 @staticmethod
558 def get_default_config(framework: str) -> dict:
559 """
560 Return the default config or kwargs for the
561 given framework.
563 The default values are extracted of the corresponding
564 framework directly.
565 """
566 if framework.lower() == "scipy_minimize":
567 return {"tol": None,
568 "options": None,
569 "constraints": None,
570 "jac": None,
571 "hess": None,
572 "hessp": None}
573 if framework.lower() == "dlib_minimize":
574 return {"num_function_calls": int(1e9),
575 "solver_epsilon": 0}
576 if framework.lower() == "scipy_differential_evolution":
577 return {"maxiter": 1000,
578 "popsize": 15,
579 "tol": 0.01,
580 "mutation": (0.5, 1),
581 "recombination": 0.7,
582 "seed": None,
583 "polish": True,
584 "init": 'latinhypercube',
585 "atol": 0
586 }
587 if framework.lower() == "pymoo":
588 return {"n_gen": 1000,
589 "pop_size": 50,
590 "sampling": "real_random",
591 "selection": "random",
592 "crossover": "real_sbx",
593 "mutation": "real_pm",
594 "eliminate_duplicates": True,
595 "n_offsprings": None,
596 "termination": None,
597 "seed": 1,
598 "verbose": False,
599 "display": None,
600 "callback": None,
601 "save_history": False,
602 "copy_algorithm": False,
603 "copy_termination": False
604 }
605 if framework.lower() == "bayesian_optimization":
606 return {"random_state": 42,
607 "allow_dublicate_points": True,
608 "init_points": 5,
609 "n_iter": 25,
610 "kind_of_utility_function": "ei",
611 "xi": 0.1,
612 "verbose": False
613 }
614 return {}