Coverage for ebcpy/optimization.py: 86%

219 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-09-19 12:21 +0000

1"""Base-module for the whole optimization pacakge. 

2Used to define Base-Classes such as Optimizer and 

3Calibrator.""" 

4 

5import os 

6from pathlib import Path 

7import warnings 

8from typing import List, Tuple, Union 

9from collections import namedtuple 

10from abc import abstractmethod 

11import numpy as np 

12from ebcpy.utils import setup_logger 

13# pylint: disable=import-outside-toplevel 

14# pylint: disable=broad-except 

15 

16 

17class Optimizer: 

18 """ 

19 Base class for optimization in ebcpy. All classes 

20 performing optimization tasks must inherit from this 

21 class. 

22 The main feature of this class is the common interface 

23 for different available solvers in python. This makes the 

24 testing of different solvers and methods more easy. 

25 For available frameworks/solvers, check the function 

26 self.optimize(). 

27 

28 

29 :param str,Path working_directory: 

30 Directory for storing all output of optimization via a logger. 

31 :keyword list bounds: 

32 The boundaries for the optimization variables. 

33 """ 

34 

35 # Used to display number of obj-function-calls 

36 _counter = 0 

37 # Used to access the current parameter set if an optimization-step fails 

38 _current_iterate = np.array([]) 

39 # Used to access the best iterate if an optimization step fails 

40 _current_best_iterate = {"Objective": np.inf} 

41 # List storing every objective value for plotting and logging. 

42 # Can be used, but will enlarge runtime 

43 _obj_his = [] 

44 

45 def __init__(self, working_directory: Union[Path, str] = None, **kwargs): 

46 """Instantiate class parameters""" 

47 if working_directory is None and "cd" in kwargs: 

48 warnings.warn("cd was renamed to working_directory in all classes. Use working_directory instead.", category=DeprecationWarning) 

49 self.working_directory = kwargs["cd"] 

50 elif working_directory is None: 

51 self._working_directory = None 

52 else: 

53 self.working_directory = working_directory 

54 

55 self.logger = setup_logger(working_directory=self.working_directory, name=self.__class__.__name__) 

56 # Set kwargs 

57 self.bounds = kwargs.get("bounds", None) 

58 

59 @abstractmethod 

60 def obj(self, xk, *args): 

61 """ 

62 Base objective function. Overload this function and create your own 

63 objective function. Make sure that the return value is a scalar. 

64 Furthermore, the parameter vector xk is always a numpy array. 

65 

66 :param np.array xk: 

67 Array with parameters for optimization 

68 

69 :return: float result: 

70 A scalar (float/ 1d) value for the optimization framework. 

71 """ 

72 raise NotImplementedError(f'{self.__class__.__name__}.obj function is not defined') 

73 

74 @abstractmethod 

75 def mp_obj(self, x, *args): 

76 """ 

77 Objective function for Multiprocessing. 

78 

79 :param np.array x: 

80 Array with parameters for optimization. 

81 Shape of the array is (number_of_evaluations x number_of_variables). 

82 For instance, optimizating 10 variables and evaluating 

83 900 objectives in parallel, the shape would be 900 x 10. 

84 :param int n_cpu: 

85 Number of logical Processors to run optimization on. 

86 """ 

87 raise NotImplementedError(f'{self.__class__.__name__}.obj function is not defined') 

88 

89 @property 

90 def supported_frameworks(self): 

91 """ 

92 List with all frameworks supported by this 

93 wrapper class. 

94 """ 

95 return ["scipy_minimize", 

96 "scipy_differential_evolution", 

97 "dlib_minimize", 

98 "pymoo", 

99 "bayesian_optimization"] 

100 

101 @property 

102 def working_directory(self) -> Path: 

103 """The current working directory""" 

104 return self._working_directory 

105 

106 @working_directory.setter 

107 def working_directory(self, working_directory: Union[Path, str]): 

108 """Set current working directory""" 

109 if isinstance(working_directory, str): 

110 working_directory = Path(working_directory) 

111 os.makedirs(working_directory, exist_ok=True) 

112 self._working_directory = working_directory 

113 

114 @property 

115 def cd(self) -> Path: 

116 warnings.warn("cd was renamed to working_directory in all classes. Use working_directory instead instead.", category=DeprecationWarning) 

117 return self.working_directory 

118 

119 @cd.setter 

120 def cd(self, cd: Union[Path, str]): 

121 warnings.warn("cd was renamed to working_directory in all classes. Use working_directory instead instead.", category=DeprecationWarning) 

122 self.working_directory = cd 

123 

124 @property 

125 def bounds(self) -> List[Union[Tuple, List]]: 

126 """The boundaries of the optimization problem.""" 

127 return self._bounds 

128 

129 @bounds.setter 

130 def bounds(self, bounds): 

131 """Set the boundaries to the optimization variables""" 

132 self._bounds = bounds 

133 

134 def optimize(self, framework, method=None, n_cpu=1, **kwargs): 

135 """ 

136 Perform the optimization based on the given method and framework. 

137 

138 :param str framework: 

139 The framework (python module) you want to use to perform the optimization. 

140 Currently, "scipy_minimize", "dlib_minimize" and "scipy_differential_evolution" 

141 are supported options. To further inform yourself about these frameworks, please see: 

142 - `dlib <http://dlib.net/python/index.html>`_ 

143 - `scipy minimize <https://docs.scipy.org/doc/scipy/reference/generated/scipy.optimize.minimize.html>`_ 

144 - `scipy differential evolution <https://docs.scipy.org/doc/scipy/reference/generated/scipy.optimize.differential_evolution.html>`_ 

145 - 'pymoo' <https://pymoo.org/index.html> 

146 :param str method: 

147 The method you pass depends on the methods available in the framework 

148 you chose when setting up the class. Some frameworks don't require a 

149 method, as only one exists. This is the case for dlib. For any framework 

150 with different methods, you must provide one. 

151 For the scipy.differential_evolution function, method is equal to the 

152 strategy. 

153 For the pymoo function, method is equal to the 

154 algorithm. 

155 :param int n_cpu: 

156 Number of parallel processes used for the evaluation. 

157 Ignored if the framework-method combination does not 

158 support multi-processing. 

159 

160 Keyword arguments: 

161 Depending on the framework an method you use, you can fine-tune the 

162 optimization tool using extra arguments. We refer to the documentation of 

163 each framework for a listing of what parameters are supported and how 

164 to set them. 

165 E.g. For scipy.optimize.minimize one could 

166 add "tol=1e-3" as a kwarg. 

167 

168 :return: res 

169 Optimization result. 

170 """ 

171 # Choose the framework 

172 minimize_func, requires_method = self._choose_framework(framework) 

173 if method is None and requires_method: 

174 raise ValueError(f"{framework} requires a method, but None is " 

175 f"provided. Please choose one.") 

176 # Perform minimization 

177 res = minimize_func(method=method, n_cpu=n_cpu, **kwargs) 

178 return res 

179 

180 def _choose_framework(self, framework): 

181 """ 

182 Function to select the functions for optimization 

183 and for executing said functions. 

184 

185 :param str framework: 

186 String for selection of the relevant function. Supported options are: 

187 - scipy_minimize 

188 - dlib_minimize 

189 - scipy_differential_evolution 

190 - pymoo 

191 """ 

192 if framework.lower() == "scipy_minimize": 

193 return self._scipy_minimize, True 

194 if framework.lower() == "dlib_minimize": 

195 return self._dlib_minimize, False 

196 if framework.lower() == "scipy_differential_evolution": 

197 return self._scipy_differential_evolution, True 

198 if framework.lower() == "pymoo": 

199 return self._pymoo, True 

200 if framework.lower() == "bayesian_optimization": 

201 return self._bayesian_optimization, False 

202 

203 raise TypeError(f"Given framework {framework} is currently not supported.") 

204 

205 def _bayesian_optimization(self, method=None, n_cpu=1, **kwargs): 

206 """ 

207 Possible kwargs for the bayesian_optimization function with default values: 

208  

209 random_state = 42 

210 allow_dublicate_points = True 

211 init_points = 100 

212 n_iter = 100 

213 kind_of_utility_function = "ei" 

214 xi = 0.1 

215  

216 For an explanation of what the parameters do, we refer to the documentation of 

217 the bayesian optimization package: 

218 https://bayesian-optimization.github.io/BayesianOptimization/index.html 

219 """ 

220 default_kwargs = self.get_default_config(framework="bayesian_optimization") 

221 default_kwargs.update(kwargs) 

222 

223 try: 

224 from bayes_opt import BayesianOptimization 

225 from bayes_opt.util import UtilityFunction 

226 except ImportError as error: 

227 raise ImportError("Please install bayesian-optimization to use " 

228 "the bayesian_optimization function.") from error 

229 

230 try: 

231 if self.bounds is None: 

232 raise ValueError("For the bayesian optimization approach, you need to specify " 

233 "boundaries. Currently, no bounds are specified.") 

234 

235 pbounds = {f"x{n}": i for n, i in enumerate(self.bounds)} 

236 

237 optimizer = BayesianOptimization( 

238 f=self._bayesian_opt_obj, 

239 pbounds=pbounds, 

240 random_state=default_kwargs["random_state"], 

241 allow_duplicate_points=default_kwargs["allow_dublicate_points"], 

242 verbose=default_kwargs["verbose"] 

243 ) 

244 

245 gp = default_kwargs.get("gp", None) 

246 if gp is not None: 

247 optimizer._gp = gp 

248 

249 acq_function = UtilityFunction( 

250 kind=default_kwargs["kind_of_utility_function"], 

251 xi=default_kwargs["xi"]) 

252 

253 optimizer.maximize( 

254 init_points=default_kwargs["init_points"], 

255 n_iter=default_kwargs["n_iter"], 

256 acquisition_function=acq_function 

257 ) 

258 

259 res = optimizer.max 

260 x_res = np.array(list(res["params"].values())) 

261 f_res = -res["target"] 

262 res_tuple = namedtuple("res_tuple", "x fun") 

263 res = res_tuple(x=x_res, fun=f_res) 

264 return res 

265 except (KeyboardInterrupt, Exception) as error: 

266 # pylint: disable=inconsistent-return-statements 

267 self._handle_error(error) 

268 

269 def _bayesian_opt_obj(self, **kwargs): 

270 """ 

271 This function is needed as the signature for the Bayesian-optimization 

272 is different than the standard signature. The Bayesian-optimization gives keyword arguments for 

273 every parameter and only maximizes, therefore we will maximize the negative objective function value. 

274 """ 

275 xk = np.array(list(kwargs.values())) 

276 return -self.obj(xk) 

277 

278 

279 def _scipy_minimize(self, method, n_cpu=1, **kwargs): 

280 """ 

281 Possible kwargs for the scipy minimize function with default values: 

282 

283 x0: Required 

284 tol = None 

285 options = None 

286 constraints = {} 

287 jac = None 

288 hess = None 

289 hessp = None 

290 """ 

291 default_kwargs = self.get_default_config(framework="scipy_minimize") 

292 default_kwargs.update(kwargs) 

293 try: 

294 import scipy.optimize as opt 

295 except ImportError as error: 

296 raise ImportError("Please install scipy to use " 

297 "the minimize_scipy function.") from error 

298 

299 try: 

300 if "x0" not in kwargs: 

301 raise KeyError("An initial guess (x0) is required " 

302 "for scipy.minimize. You passed None") 

303 res = opt.minimize( 

304 fun=self.obj, 

305 x0=kwargs["x0"], 

306 method=method, 

307 jac=default_kwargs["jac"], 

308 hess=default_kwargs["hess"], 

309 hessp=default_kwargs["hessp"], 

310 bounds=self.bounds, 

311 constraints=default_kwargs["constraints"], 

312 tol=default_kwargs["tol"], 

313 options=default_kwargs["options"] 

314 ) 

315 return res 

316 except (KeyboardInterrupt, Exception) as error: 

317 # pylint: disable=inconsistent-return-statements 

318 self._handle_error(error) 

319 

320 def _dlib_minimize(self, method=None, n_cpu=1, **kwargs): 

321 """ 

322 Possible kwargs for the dlib minimize function with default values: 

323 

324 is_integer_variable = None 

325 solver_epsilon = 0 

326 num_function_calls = int(1e9) 

327 """ 

328 default_kwargs = self.get_default_config(framework="dlib_minimize") 

329 default_kwargs.update(kwargs) 

330 try: 

331 import dlib 

332 except ImportError as error: 

333 raise ImportError("Please install dlib to use the minimize_dlib function.") from error 

334 try: 

335 _bounds_2d = np.array(self.bounds) 

336 _bound_min = list(_bounds_2d[:, 0]) 

337 _bound_max = list(_bounds_2d[:, 1]) 

338 if "is_integer_variable" not in kwargs: 

339 is_integer_variable = list(np.zeros(len(_bound_max))) 

340 else: 

341 is_integer_variable = kwargs["is_integer_variable"] 

342 

343 # This check is only necessary as the error-messages from dlib are quite indirect. 

344 # Any new user would not get that these parameters cause the error. 

345 for key in ["solver_epsilon", "num_function_calls"]: 

346 value = kwargs.get(key) 

347 if value is not None: 

348 if not isinstance(value, (float, int)): 

349 raise TypeError( 

350 f"Given {key} is of type {type(value).__name__} but " 

351 f"should be type float or int" 

352 ) 

353 

354 x_res, f_res = dlib.find_min_global( 

355 f=self._dlib_obj, 

356 bound1=_bound_min, 

357 bound2=_bound_max, 

358 is_integer_variable=is_integer_variable, 

359 num_function_calls=int(default_kwargs["num_function_calls"]), 

360 solver_epsilon=float(default_kwargs["solver_epsilon"]) 

361 ) 

362 res_tuple = namedtuple("res_tuple", "x fun") 

363 res = res_tuple(x=x_res, fun=f_res) 

364 return res 

365 except (KeyboardInterrupt, Exception) as error: 

366 # pylint: disable=inconsistent-return-statements 

367 self._handle_error(error) 

368 

369 def _scipy_differential_evolution(self, method="best1bin", n_cpu=1, **kwargs): 

370 """ 

371 Possible kwargs for the dlib minimize function with default values: 

372 

373 maxiter = 1000 

374 popsize = 15 

375 tol = None 

376 mutation = (0.5, 1) 

377 recombination = 0.7 

378 seed = None 

379 polish = True 

380 init = 'latinhypercube' 

381 atol = 0 

382 """ 

383 default_kwargs = self.get_default_config(framework="scipy_differential_evolution") 

384 default_kwargs.update(kwargs) 

385 try: 

386 import scipy.optimize as opt 

387 except ImportError as error: 

388 raise ImportError("Please install scipy to use the minimize_scipy function.") from error 

389 

390 try: 

391 if self.bounds is None: 

392 raise ValueError("For the differential evolution approach, you need to specify " 

393 "boundaries. Currently, no bounds are specified.") 

394 

395 res = opt.differential_evolution( 

396 func=self.obj, 

397 bounds=self.bounds, 

398 strategy=method, 

399 maxiter=default_kwargs["maxiter"], 

400 popsize=default_kwargs["popsize"], 

401 tol=default_kwargs["tol"], 

402 mutation=default_kwargs["mutation"], 

403 recombination=default_kwargs["recombination"], 

404 seed=default_kwargs["seed"], 

405 disp=False, # We have our own logging 

406 polish=default_kwargs["polish"], 

407 init=default_kwargs["init"], 

408 atol=default_kwargs["atol"] 

409 ) 

410 return res 

411 except (KeyboardInterrupt, Exception) as error: 

412 # pylint: disable=inconsistent-return-statements 

413 self._handle_error(error) 

414 

415 def _pymoo(self, method="GA", n_cpu=1, **kwargs): 

416 """ 

417 Possible kwargs for the dlib minimize function with default values: 

418 

419 algorithm=NGSA2 

420 termination=None 

421 seed=None 

422 verbose=False 

423 display=None 

424 callback=None 

425 save_history=False 

426 copy_algorithm=False 

427 copy_termination=False 

428 """ 

429 default_kwargs = self.get_default_config(framework="pymoo") 

430 default_kwargs.update(kwargs) 

431 

432 try: 

433 from pymoo.optimize import minimize 

434 from pymoo.problems.single import Problem 

435 from pymoo.factory import get_sampling, get_mutation, get_crossover, get_selection 

436 from pymoo.algorithms.moo.ctaea import CTAEA 

437 from pymoo.algorithms.moo.moead import MOEAD 

438 from pymoo.algorithms.moo.nsga2 import NSGA2 

439 from pymoo.algorithms.moo.nsga3 import NSGA3 

440 from pymoo.algorithms.moo.rnsga2 import RNSGA2 

441 from pymoo.algorithms.moo.rnsga3 import RNSGA3 

442 from pymoo.algorithms.soo.nonconvex.de import DE 

443 from pymoo.algorithms.soo.nonconvex.ga import GA 

444 from pymoo.algorithms.moo.unsga3 import UNSGA3 

445 from pymoo.algorithms.soo.nonconvex.nelder_mead import NelderMead 

446 from pymoo.algorithms.soo.nonconvex.brkga import BRKGA 

447 from pymoo.algorithms.soo.nonconvex.pattern_search import PatternSearch 

448 from pymoo.algorithms.soo.nonconvex.pso import PSO 

449 

450 except ImportError as error: 

451 raise ImportError("Please install pymoo to use this function.") from error 

452 

453 

454 pymoo_algorithms = { 

455 "ga": GA, 

456 "brkga": BRKGA, 

457 "de": DE, 

458 "nelder-mead": NelderMead, 

459 "pattern-search": PatternSearch, 

460 "pso": PSO, 

461 "nsga2": NSGA2, 

462 "rnsga2": RNSGA2, 

463 "nsga3": NSGA3, 

464 "unsga3": UNSGA3, 

465 "rnsga3": RNSGA3, 

466 "moead": MOEAD, 

467 "ctaea": CTAEA, 

468 } 

469 

470 if method.lower() not in pymoo_algorithms: 

471 raise ValueError(f"Given method {method} is currently not supported. Please choose one of the " 

472 "following: " + ", ".join(pymoo_algorithms.keys())) 

473 

474 class EBCPYProblem(Problem): 

475 """Construct wrapper problem class.""" 

476 def __init__(self, 

477 ebcpy_class: Optimizer 

478 ): 

479 self.ebcpy_class = ebcpy_class 

480 super().__init__(n_var=len(ebcpy_class.bounds), 

481 n_obj=1, 

482 n_constr=0, 

483 xl=np.array([bound[0] for bound in ebcpy_class.bounds]), 

484 xu=np.array([bound[1] for bound in ebcpy_class.bounds]) 

485 ) 

486 

487 def _evaluate(self, x, out, *args, **kwargs): 

488 if n_cpu > 1: 

489 out["F"] = self.ebcpy_class.mp_obj(x, n_cpu, *args) 

490 else: 

491 out["F"] = np.array([self.ebcpy_class.obj(xk=_x, *args) for _x in x]) 

492 

493 try: 

494 if self.bounds is None: 

495 raise ValueError("For pymoo, you need to specify " 

496 "boundaries. Currently, no bounds are specified.") 

497 

498 termination = default_kwargs.pop("termination") 

499 if termination is None: 

500 termination = ("n_gen", default_kwargs.pop("n_gen")) 

501 seed = default_kwargs.pop("seed") 

502 verbose = default_kwargs.pop("verbose") 

503 save_history = default_kwargs.pop("save_history") 

504 copy_algorithm = default_kwargs.pop("copy_algorithm") 

505 copy_termination = default_kwargs.pop("copy_termination") 

506 callback = default_kwargs.pop("callback") 

507 display = default_kwargs.pop("display") 

508 

509 default_kwargs["sampling"] = get_sampling(name=default_kwargs["sampling"]) 

510 default_kwargs["selection"] = get_selection(name=default_kwargs["selection"]) 

511 default_kwargs["crossover"] = get_crossover(name=default_kwargs["crossover"]) 

512 default_kwargs["mutation"] = get_mutation(name=default_kwargs["mutation"]) 

513 

514 algorithm = pymoo_algorithms[method.lower()](**default_kwargs) 

515 

516 res = minimize( 

517 problem=EBCPYProblem(ebcpy_class=self), 

518 algorithm=algorithm, 

519 termination=termination, 

520 seed=seed, 

521 verbose=verbose, 

522 display=display, 

523 callback=callback, 

524 save_history=save_history, 

525 copy_algorithm=copy_algorithm, 

526 copy_termination=copy_termination, 

527 ) 

528 res_tuple = namedtuple("res_tuple", "x fun") 

529 res = res_tuple(x=res.X, fun=res.F[0]) 

530 return res 

531 except (KeyboardInterrupt, Exception) as error: 

532 # pylint: disable=inconsistent-return-statements 

533 self._handle_error(error) 

534 

535 def _dlib_obj(self, *args): 

536 """ 

537 This function is needed as the signature for the dlib-obj 

538 is different than the standard signature. dlib will parse a number of 

539 parameters 

540 """ 

541 return self.obj(np.array(args)) 

542 

543 def _handle_error(self, error): 

544 """ 

545 Function to handle the case when an optimization step fails (e.g. simulation-fail). 

546 The parameter set which caused the failure and the best iterate until this point 

547 are of interest for the user in such case. 

548 :param error: 

549 Any Exception that may occur 

550 """ 

551 self.logger.error(f"Parameter set which caused the failure: {self._current_iterate}") 

552 self.logger.error("Current best objective and parameter set:") 

553 self.logger.error("\n".join([f"{key}: {value}" 

554 for key, value in self._current_best_iterate.items()])) 

555 raise error 

556 

557 @staticmethod 

558 def get_default_config(framework: str) -> dict: 

559 """ 

560 Return the default config or kwargs for the 

561 given framework. 

562 

563 The default values are extracted of the corresponding 

564 framework directly. 

565 """ 

566 if framework.lower() == "scipy_minimize": 

567 return {"tol": None, 

568 "options": None, 

569 "constraints": None, 

570 "jac": None, 

571 "hess": None, 

572 "hessp": None} 

573 if framework.lower() == "dlib_minimize": 

574 return {"num_function_calls": int(1e9), 

575 "solver_epsilon": 0} 

576 if framework.lower() == "scipy_differential_evolution": 

577 return {"maxiter": 1000, 

578 "popsize": 15, 

579 "tol": 0.01, 

580 "mutation": (0.5, 1), 

581 "recombination": 0.7, 

582 "seed": None, 

583 "polish": True, 

584 "init": 'latinhypercube', 

585 "atol": 0 

586 } 

587 if framework.lower() == "pymoo": 

588 return {"n_gen": 1000, 

589 "pop_size": 50, 

590 "sampling": "real_random", 

591 "selection": "random", 

592 "crossover": "real_sbx", 

593 "mutation": "real_pm", 

594 "eliminate_duplicates": True, 

595 "n_offsprings": None, 

596 "termination": None, 

597 "seed": 1, 

598 "verbose": False, 

599 "display": None, 

600 "callback": None, 

601 "save_history": False, 

602 "copy_algorithm": False, 

603 "copy_termination": False 

604 } 

605 if framework.lower() == "bayesian_optimization": 

606 return {"random_state": 42, 

607 "allow_dublicate_points": True, 

608 "init_points": 5, 

609 "n_iter": 25, 

610 "kind_of_utility_function": "ei", 

611 "xi": 0.1, 

612 "verbose": False 

613 } 

614 return {}