Coverage for ebcpy/optimization.py: 84%
223 statements
« prev ^ index » next coverage.py v7.4.4, created at 2025-08-20 12:54 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2025-08-20 12:54 +0000
1"""Base-module for the whole optimization pacakge.
2Used to define Base-Classes such as Optimizer and
3Calibrator."""
5import os
6from pathlib import Path
7import warnings
8from typing import List, Tuple, Union
9from collections import namedtuple
10from abc import abstractmethod
11import numpy as np
12from ebcpy.utils import setup_logger
13# pylint: disable=import-outside-toplevel
14# pylint: disable=broad-except
17class Optimizer:
18 """
19 Base class for optimization in ebcpy. All classes
20 performing optimization tasks must inherit from this
21 class.
22 The main feature of this class is the common interface
23 for different available solvers in python. This makes the
24 testing of different solvers and methods more easy.
25 For available frameworks/solvers, check the function
26 self.optimize().
29 :param str,Path working_directory:
30 Directory for storing all output of optimization via a logger.
31 :keyword list bounds:
32 The boundaries for the optimization variables.
33 """
35 # Used to display number of obj-function-calls
36 _counter = 0
37 # Used to access the current parameter set if an optimization-step fails
38 _current_iterate = np.array([])
39 # Used to access the best iterate if an optimization step fails
40 _current_best_iterate = {"Objective": np.inf}
41 # List storing every objective value for plotting and logging.
42 # Can be used, but will enlarge runtime
43 _obj_his = []
45 def __init__(self, working_directory: Union[Path, str] = None, **kwargs):
46 """Instantiate class parameters"""
47 if working_directory is None and "cd" in kwargs:
48 warnings.warn("cd was renamed to working_directory in all classes. Use working_directory instead.", category=DeprecationWarning)
49 self.working_directory = kwargs["cd"]
50 elif working_directory is None:
51 self._working_directory = None
52 else:
53 self.working_directory = working_directory
55 self.logger = setup_logger(working_directory=self.working_directory, name=self.__class__.__name__)
56 # Set kwargs
57 self.bounds = kwargs.get("bounds", None)
59 @abstractmethod
60 def obj(self, xk, *args):
61 """
62 Base objective function. Overload this function and create your own
63 objective function. Make sure that the return value is a scalar.
64 Furthermore, the parameter vector xk is always a numpy array.
66 :param np.array xk:
67 Array with parameters for optimization
69 :return: float result:
70 A scalar (float/ 1d) value for the optimization framework.
71 """
72 raise NotImplementedError(f'{self.__class__.__name__}.obj function is not defined')
74 @abstractmethod
75 def mp_obj(self, x, *args):
76 """
77 Objective function for Multiprocessing.
79 :param np.array x:
80 Array with parameters for optimization.
81 Shape of the array is (number_of_evaluations x number_of_variables).
82 For instance, optimizating 10 variables and evaluating
83 900 objectives in parallel, the shape would be 900 x 10.
84 :param int n_cpu:
85 Number of logical Processors to run optimization on.
86 """
87 raise NotImplementedError(f'{self.__class__.__name__}.obj function is not defined')
89 @property
90 def supported_frameworks(self):
91 """
92 List with all frameworks supported by this
93 wrapper class.
94 """
95 return ["scipy_minimize",
96 "scipy_differential_evolution",
97 "dlib_minimize",
98 "pymoo",
99 "bayesian_optimization"]
101 @property
102 def working_directory(self) -> Path:
103 """The current working directory"""
104 return self._working_directory
106 @working_directory.setter
107 def working_directory(self, working_directory: Union[Path, str]):
108 """Set current working directory"""
109 if isinstance(working_directory, str):
110 working_directory = Path(working_directory)
111 os.makedirs(working_directory, exist_ok=True)
112 self._working_directory = working_directory
114 @property
115 def cd(self) -> Path:
116 warnings.warn("cd was renamed to working_directory in all classes. Use working_directory instead instead.", category=DeprecationWarning)
117 return self.working_directory
119 @cd.setter
120 def cd(self, cd: Union[Path, str]):
121 warnings.warn("cd was renamed to working_directory in all classes. Use working_directory instead instead.", category=DeprecationWarning)
122 self.working_directory = cd
124 @property
125 def bounds(self) -> List[Union[Tuple, List]]:
126 """The boundaries of the optimization problem."""
127 return self._bounds
129 @bounds.setter
130 def bounds(self, bounds):
131 """Set the boundaries to the optimization variables"""
132 self._bounds = bounds
134 def optimize(self, framework, method=None, n_cpu=1, **kwargs):
135 """
136 Perform the optimization based on the given method and framework.
138 :param str framework:
139 The framework (python module) you want to use to perform the optimization.
140 Currently, "scipy_minimize", "dlib_minimize" and "scipy_differential_evolution"
141 are supported options. To further inform yourself about these frameworks, please see:
142 - `dlib <http://dlib.net/python/index.html>`_
143 - `scipy minimize <https://docs.scipy.org/doc/scipy/reference/generated/scipy.optimize.minimize.html>`_
144 - `scipy differential evolution <https://docs.scipy.org/doc/scipy/reference/generated/scipy.optimize.differential_evolution.html>`_
145 - 'pymoo' <https://pymoo.org/index.html>
146 :param str method:
147 The method you pass depends on the methods available in the framework
148 you chose when setting up the class. Some frameworks don't require a
149 method, as only one exists. This is the case for dlib. For any framework
150 with different methods, you must provide one.
151 For the scipy.differential_evolution function, method is equal to the
152 strategy.
153 For the pymoo function, method is equal to the
154 algorithm.
155 :param int n_cpu:
156 Number of parallel processes used for the evaluation.
157 Ignored if the framework-method combination does not
158 support multi-processing.
160 Keyword arguments:
161 Depending on the framework an method you use, you can fine-tune the
162 optimization tool using extra arguments. We refer to the documentation of
163 each framework for a listing of what parameters are supported and how
164 to set them.
165 E.g. For scipy.optimize.minimize one could
166 add "tol=1e-3" as a kwarg.
168 :return: res
169 Optimization result.
170 """
171 # Choose the framework
172 minimize_func, requires_method = self._choose_framework(framework)
173 if method is None and requires_method:
174 raise ValueError(f"{framework} requires a method, but None is "
175 f"provided. Please choose one.")
176 # Perform minimization
177 res = minimize_func(method=method, n_cpu=n_cpu, **kwargs)
178 return res
180 def _choose_framework(self, framework):
181 """
182 Function to select the functions for optimization
183 and for executing said functions.
185 :param str framework:
186 String for selection of the relevant function. Supported options are:
187 - scipy_minimize
188 - dlib_minimize
189 - scipy_differential_evolution
190 - pymoo
191 """
192 if framework.lower() == "scipy_minimize":
193 return self._scipy_minimize, True
194 if framework.lower() == "dlib_minimize":
195 return self._dlib_minimize, False
196 if framework.lower() == "scipy_differential_evolution":
197 return self._scipy_differential_evolution, True
198 if framework.lower() == "pymoo":
199 return self._pymoo, True
200 if framework.lower() == "bayesian_optimization":
201 return self._bayesian_optimization, False
203 raise TypeError(f"Given framework {framework} is currently not supported.")
205 def _bayesian_optimization(self, method=None, n_cpu=1, **kwargs):
206 """
207 Possible kwargs for the bayesian_optimization function with default values:
209 random_state = 42
210 allow_dublicate_points = True
211 init_points = 100
212 n_iter = 100
213 kind_of_utility_function = "ei"
214 xi = 0.1
216 For an explanation of what the parameters do, we refer to the documentation of
217 the bayesian optimization package:
218 https://bayesian-optimization.github.io/BayesianOptimization/index.html
219 """
220 default_kwargs = self.get_default_config(framework="bayesian_optimization")
221 default_kwargs.update(kwargs)
223 try:
224 from bayes_opt import BayesianOptimization
225 from bayes_opt.util import UtilityFunction
226 except ImportError as error:
227 raise ImportError("Please install bayesian-optimization to use "
228 "the bayesian_optimization function.") from error
230 try:
231 if self.bounds is None:
232 raise ValueError("For the bayesian optimization approach, you need to specify "
233 "boundaries. Currently, no bounds are specified.")
235 pbounds = {f"x{n}": i for n, i in enumerate(self.bounds)}
237 optimizer = BayesianOptimization(
238 f=self._bayesian_opt_obj,
239 pbounds=pbounds,
240 random_state=default_kwargs["random_state"],
241 allow_duplicate_points=default_kwargs["allow_dublicate_points"],
242 verbose=default_kwargs["verbose"]
243 )
245 gp = default_kwargs.get("gp", None)
246 if gp is not None:
247 optimizer._gp = gp
249 acq_function = UtilityFunction(
250 kind=default_kwargs["kind_of_utility_function"],
251 xi=default_kwargs["xi"])
253 optimizer.maximize(
254 init_points=default_kwargs["init_points"],
255 n_iter=default_kwargs["n_iter"],
256 acquisition_function=acq_function
257 )
259 res = optimizer.max
260 x_res = np.array(list(res["params"].values()))
261 f_res = -res["target"]
262 res_tuple = namedtuple("res_tuple", "x fun")
263 res = res_tuple(x=x_res, fun=f_res)
264 return res
265 except (KeyboardInterrupt, Exception) as error:
266 # pylint: disable=inconsistent-return-statements
267 self._handle_error(error)
269 def _bayesian_opt_obj(self, **kwargs):
270 """
271 This function is needed as the signature for the Bayesian-optimization
272 is different than the standard signature. The Bayesian-optimization gives keyword arguments for
273 every parameter and only maximizes, therefore we will maximize the negative objective function value.
274 """
275 xk = np.array(list(kwargs.values()))
276 return -self.obj(xk)
279 def _scipy_minimize(self, method, n_cpu=1, **kwargs):
280 """
281 Possible kwargs for the scipy minimize function with default values:
283 x0: Required
284 tol = None
285 options = None
286 constraints = {}
287 jac = None
288 hess = None
289 hessp = None
290 """
291 default_kwargs = self.get_default_config(framework="scipy_minimize")
292 default_kwargs.update(kwargs)
293 try:
294 import scipy.optimize as opt
295 except ImportError as error:
296 raise ImportError("Please install scipy to use "
297 "the minimize_scipy function.") from error
299 try:
300 if "x0" not in kwargs:
301 raise KeyError("An initial guess (x0) is required "
302 "for scipy.minimize. You passed None")
303 res = opt.minimize(
304 fun=self.obj,
305 x0=kwargs["x0"],
306 method=method,
307 jac=default_kwargs["jac"],
308 hess=default_kwargs["hess"],
309 hessp=default_kwargs["hessp"],
310 bounds=self.bounds,
311 constraints=default_kwargs["constraints"],
312 tol=default_kwargs["tol"],
313 options=default_kwargs["options"]
314 )
315 return res
316 except (KeyboardInterrupt, Exception) as error:
317 # pylint: disable=inconsistent-return-statements
318 self._handle_error(error)
320 def _dlib_minimize(self, method=None, n_cpu=1, **kwargs):
321 """
322 Possible kwargs for the dlib minimize function with default values:
324 is_integer_variable = None
325 solver_epsilon = 0
326 num_function_calls = int(1e9)
327 """
328 default_kwargs = self.get_default_config(framework="dlib_minimize")
329 default_kwargs.update(kwargs)
330 try:
331 import dlib
332 except ImportError as error:
333 raise ImportError("Please install dlib to use the minimize_dlib function.") from error
334 try:
335 _bounds_2d = np.array(self.bounds)
336 _bound_min = list(_bounds_2d[:, 0])
337 _bound_max = list(_bounds_2d[:, 1])
338 if "is_integer_variable" not in kwargs:
339 is_integer_variable = list(np.zeros(len(_bound_max)))
340 else:
341 is_integer_variable = kwargs["is_integer_variable"]
343 # This check is only necessary as the error-messages from dlib are quite indirect.
344 # Any new user would not get that these parameters cause the error.
345 for key in ["solver_epsilon", "num_function_calls"]:
346 value = kwargs.get(key)
347 if value is not None:
348 if not isinstance(value, (float, int)):
349 raise TypeError(
350 f"Given {key} is of type {type(value).__name__} but "
351 f"should be type float or int"
352 )
354 x_res, f_res = dlib.find_min_global(
355 f=self._dlib_obj,
356 bound1=_bound_min,
357 bound2=_bound_max,
358 is_integer_variable=is_integer_variable,
359 num_function_calls=int(default_kwargs["num_function_calls"]),
360 solver_epsilon=float(default_kwargs["solver_epsilon"])
361 )
362 res_tuple = namedtuple("res_tuple", "x fun")
363 res = res_tuple(x=x_res, fun=f_res)
364 return res
365 except (KeyboardInterrupt, Exception) as error:
366 # pylint: disable=inconsistent-return-statements
367 self._handle_error(error)
369 def _scipy_differential_evolution(self, method="best1bin", n_cpu=1, **kwargs):
370 """
371 Possible kwargs for the dlib minimize function with default values:
373 maxiter = 1000
374 popsize = 15
375 tol = None
376 mutation = (0.5, 1)
377 recombination = 0.7
378 seed = None
379 polish = True
380 init = 'latinhypercube'
381 atol = 0
382 """
383 default_kwargs = self.get_default_config(framework="scipy_differential_evolution")
384 default_kwargs.update(kwargs)
385 try:
386 import scipy.optimize as opt
387 except ImportError as error:
388 raise ImportError("Please install scipy to use the minimize_scipy function.") from error
390 try:
391 if self.bounds is None:
392 raise ValueError("For the differential evolution approach, you need to specify "
393 "boundaries. Currently, no bounds are specified.")
395 res = opt.differential_evolution(
396 func=self.obj,
397 bounds=self.bounds,
398 strategy=method,
399 maxiter=default_kwargs["maxiter"],
400 popsize=default_kwargs["popsize"],
401 tol=default_kwargs["tol"],
402 mutation=default_kwargs["mutation"],
403 recombination=default_kwargs["recombination"],
404 seed=default_kwargs["seed"],
405 disp=False, # We have our own logging
406 polish=default_kwargs["polish"],
407 init=default_kwargs["init"],
408 atol=default_kwargs["atol"]
409 )
410 return res
411 except (KeyboardInterrupt, Exception) as error:
412 # pylint: disable=inconsistent-return-statements
413 self._handle_error(error)
415 def _pymoo(self, method="GA", n_cpu=1, **kwargs):
416 """
417 Possible kwargs for the dlib minimize function with default values:
419 algorithm=NGSA2
420 termination=None
421 seed=None
422 verbose=False
423 display=None
424 callback=None
425 save_history=False
426 copy_algorithm=False
427 copy_termination=False
428 """
429 default_kwargs = self.get_default_config(framework="pymoo")
430 default_kwargs.update(kwargs)
432 try:
433 from pymoo.optimize import minimize
434 from pymoo.problems.single import Problem
435 from pymoo.factory import get_sampling, get_mutation, get_crossover, get_selection
436 from pymoo.algorithms.moo.ctaea import CTAEA
437 from pymoo.algorithms.moo.moead import MOEAD
438 from pymoo.algorithms.moo.nsga2 import NSGA2
439 from pymoo.algorithms.moo.nsga3 import NSGA3
440 from pymoo.algorithms.moo.rnsga2 import RNSGA2
441 from pymoo.algorithms.moo.rnsga3 import RNSGA3
442 from pymoo.algorithms.soo.nonconvex.de import DE
443 from pymoo.algorithms.soo.nonconvex.ga import GA
444 from pymoo.algorithms.moo.unsga3 import UNSGA3
445 from pymoo.algorithms.soo.nonconvex.nelder_mead import NelderMead
446 from pymoo.algorithms.soo.nonconvex.brkga import BRKGA
447 from pymoo.algorithms.soo.nonconvex.pattern_search import PatternSearch
448 from pymoo.algorithms.soo.nonconvex.pso import PSO
450 except ImportError as error:
451 raise ImportError("Please install pymoo to use this function.") from error
454 pymoo_algorithms = {
455 "ga": GA,
456 "brkga": BRKGA,
457 "de": DE,
458 "nelder-mead": NelderMead,
459 "pattern-search": PatternSearch,
460 "pso": PSO,
461 "nsga2": NSGA2,
462 "rnsga2": RNSGA2,
463 "nsga3": NSGA3,
464 "unsga3": UNSGA3,
465 "rnsga3": RNSGA3,
466 "moead": MOEAD,
467 "ctaea": CTAEA,
468 }
470 if method.lower() not in pymoo_algorithms:
471 raise ValueError(f"Given method {method} is currently not supported. Please choose one of the "
472 "following: " + ", ".join(pymoo_algorithms.keys()))
474 class EBCPYProblem(Problem):
475 """Construct wrapper problem class."""
476 def __init__(self,
477 ebcpy_class: Optimizer
478 ):
479 self.ebcpy_class = ebcpy_class
480 super().__init__(n_var=len(ebcpy_class.bounds),
481 n_obj=1,
482 n_constr=0,
483 xl=np.array([bound[0] for bound in ebcpy_class.bounds]),
484 xu=np.array([bound[1] for bound in ebcpy_class.bounds])
485 )
487 def _evaluate(self, x, out, *args, **kwargs):
488 if n_cpu > 1:
489 out["F"] = self.ebcpy_class.mp_obj(x, n_cpu, *args)
490 else:
491 out["F"] = np.array([self.ebcpy_class.obj(xk=_x, *args) for _x in x])
493 try:
494 if self.bounds is None:
495 raise ValueError("For pymoo, you need to specify "
496 "boundaries. Currently, no bounds are specified.")
498 termination = default_kwargs.pop("termination")
499 if termination is None:
500 termination = ("n_gen", default_kwargs.pop("n_gen"))
501 seed = default_kwargs.pop("seed")
502 verbose = default_kwargs.pop("verbose")
503 save_history = default_kwargs.pop("save_history")
504 copy_algorithm = default_kwargs.pop("copy_algorithm")
505 copy_termination = default_kwargs.pop("copy_termination")
506 callback = default_kwargs.pop("callback")
507 display = default_kwargs.pop("display")
509 if "selection" in default_kwargs.keys():
510 default_kwargs["selection"] = get_selection(name=default_kwargs["selection"])
511 if "crossover" in default_kwargs.keys():
512 default_kwargs["crossover"] = get_crossover(name=default_kwargs["crossover"])
513 if "sampling" in default_kwargs.keys():
514 default_kwargs["sampling"] = get_sampling(name=default_kwargs["sampling"])
515 if "mutation" in default_kwargs.keys():
516 default_kwargs["mutation"] = get_mutation(name=default_kwargs["mutation"])
517 algorithm = pymoo_algorithms[method.lower()](**default_kwargs)
519 res = minimize(
520 problem=EBCPYProblem(ebcpy_class=self),
521 algorithm=algorithm,
522 termination=termination,
523 seed=seed,
524 verbose=verbose,
525 display=display,
526 callback=callback,
527 save_history=save_history,
528 copy_algorithm=copy_algorithm,
529 copy_termination=copy_termination,
530 )
531 res_tuple = namedtuple("res_tuple", "x fun")
532 res = res_tuple(x=res.X, fun=res.F[0])
533 return res
534 except (KeyboardInterrupt, Exception) as error:
535 # pylint: disable=inconsistent-return-statements
536 self._handle_error(error)
538 def _dlib_obj(self, *args):
539 """
540 This function is needed as the signature for the dlib-obj
541 is different than the standard signature. dlib will parse a number of
542 parameters
543 """
544 return self.obj(np.array(args))
546 def _handle_error(self, error):
547 """
548 Function to handle the case when an optimization step fails (e.g. simulation-fail).
549 The parameter set which caused the failure and the best iterate until this point
550 are of interest for the user in such case.
551 :param error:
552 Any Exception that may occur
553 """
554 self.logger.error(f"Parameter set which caused the failure: {self._current_iterate}")
555 self.logger.error("Current best objective and parameter set:")
556 self.logger.error("\n".join([f"{key}: {value}"
557 for key, value in self._current_best_iterate.items()]))
558 raise error
560 @staticmethod
561 def get_default_config(framework: str) -> dict:
562 """
563 Return the default config or kwargs for the
564 given framework.
566 The default values are extracted of the corresponding
567 framework directly.
568 """
569 if framework.lower() == "scipy_minimize":
570 return {"tol": None,
571 "options": None,
572 "constraints": None,
573 "jac": None,
574 "hess": None,
575 "hessp": None}
576 if framework.lower() == "dlib_minimize":
577 return {"num_function_calls": int(1e9),
578 "solver_epsilon": 0}
579 if framework.lower() == "scipy_differential_evolution":
580 return {"maxiter": 1000,
581 "popsize": 15,
582 "tol": 0.01,
583 "mutation": (0.5, 1),
584 "recombination": 0.7,
585 "seed": None,
586 "polish": True,
587 "init": 'latinhypercube',
588 "atol": 0
589 }
590 if framework.lower() == "pymoo":
591 return {"n_gen": 1000,
592 "termination": None,
593 "seed": 1,
594 "verbose": False,
595 "display": None,
596 "callback": None,
597 "save_history": False,
598 "copy_algorithm": False,
599 "copy_termination": False
600 }
601 if framework.lower() == "bayesian_optimization":
602 return {"random_state": 42,
603 "allow_dublicate_points": True,
604 "init_points": 5,
605 "n_iter": 25,
606 "kind_of_utility_function": "ei",
607 "xi": 0.1,
608 "verbose": False
609 }
610 return {}