Source code for agentlib_mpc.data_structures.casadi_utils

"""Stores all sorts of Dataclasses, Enums or Factories to help with the
CasadiBackend."""

import os
import random
import subprocess
from contextlib import contextmanager
from dataclasses import dataclass, field
from logging import Logger
from pathlib import Path
from typing import Union, List, NamedTuple, Literal

import casadi as ca
from enum import Enum

from pydantic import ConfigDict, Field, BaseModel

from agentlib_mpc.data_structures import mpc_datamodels


CaFuncInputs = Union[ca.MX, ca.SX, ca.Sparsity, ca.DM, float, int]
DiscreteVars = List[bool]
GUESS_PREFIX = "guess_"
LB_PREFIX = "lb_"
UB_PREFIX = "ub_"

# Casadi Matrices specifying the input of all different types
# of optimization parameters. Matrices consist of different variable rows
# and have a column for each time step in the discretization.
# There are separate matrices for each input type (as defined in the
# System), and also for the upper and lower boundaries of variables
# respectively.
# Example:
# {"x": [[1, 2], [0, 2]],
# "lb_x": [[0, 0], [0, 0]],
# "ub_x": [[0, 0], [0, 0]],
# "d": [[2, 1], [1, 1]]
# }
MPCInputs = dict[str, ca.DM]


[docs]class DiscretizationMethod(str, Enum): collocation = "collocation" multiple_shooting = "multiple_shooting"
[docs]class CollocationMethod(str, Enum): radau = "radau" legendre = "legendre"
[docs]class Solvers(str, Enum): ipopt = "ipopt" qpoases = "qpoases" sqpmethod = "sqpmethod" gurobi = "gurobi" bonmin = "bonmin" fatrop = "fatrop" proxqp = "proxqp" osqp = "osqp"
[docs]class Integrators(str, Enum): cvodes = "cvodes" rk = "rk" # runge-kutta euler = "euler"
[docs]class CasadiDiscretizationOptions(mpc_datamodels.DiscretizationOptions): model_config = ConfigDict(extra="forbid") method: DiscretizationMethod = DiscretizationMethod.collocation collocation_order: int = Field(default=3, ge=1, le=9) collocation_method: CollocationMethod = CollocationMethod.legendre integrator: Integrators = Integrators.cvodes
[docs]class SolverOptions(BaseModel): name: Solvers = "ipopt" options: dict = Field(default={}) model_config = ConfigDict(extra="forbid")
[docs]@dataclass class OptParMXContainer: """Stores the necessary MX variables created during discretization for OptimizationParameters.""" var: List[ca.MX] = field(default_factory=list) # res format grid: List[float] = field(default_factory=list) # res format, mpc inputs
[docs]@dataclass class OptVarMXContainer(OptParMXContainer): """Stores the necessary MX variables created during discretization for OptimizationVariables.""" lb: List[ca.MX] = field(default_factory=list) # res format ub: List[ca.MX] = field(default_factory=list) # res format guess: List[ca.MX] = field(default_factory=list) # res format opt: ca.DM = None # mpc inputs
[docs]@dataclass class Constraint: function: ca.MX lb: ca.MX ub: ca.MX
[docs]class ModelConstraint(NamedTuple): lb: CaFuncInputs function: ca.MX ub: CaFuncInputs
[docs]@dataclass class SolverFactory: """Creates a solver given an NLP and an options construct.""" do_jit: bool bat_file: Path = None name: str = None options: SolverOptions = field(default_factory=SolverOptions) logger: Logger = None
[docs] def create_solver( self, nlp: Union[dict, str], discrete: DiscreteVars = None, equalities: list[bool] = None, ) -> ca.Function: options = self.options.options solver_name = self.options.name.casefold() if solver_name == Solvers.ipopt: return self._create_ipopt_solver(nlp=nlp, options=options) if solver_name == Solvers.fatrop: return self._create_fatrop_solver( nlp=nlp, options=options, equalities=equalities ) if solver_name == Solvers.sqpmethod: return self._create_sqpmethod_solver(nlp=nlp, options=options) if solver_name == Solvers.qpoases: return self._create_qpoases_solver(nlp=nlp, options=options) if solver_name == Solvers.proxqp: return self._create_proxqp_solver(nlp=nlp, options=options) if solver_name == Solvers.osqp: return self._create_osqp_solver(nlp=nlp, options=options) if solver_name == Solvers.gurobi: return self._create_gurobi_solver( nlp=nlp, options=options, discrete=discrete ) if solver_name == Solvers.bonmin: return self._create_bonmin_solver( nlp=nlp, options=options, discrete=discrete ) raise ValueError( f'Solver "{solver_name}" not recognized. Currently ' f"supported: {[s.value for s in Solvers]}" )
def _create_fatrop_solver(self, nlp: dict, options: dict, equalities: list[bool]): # equality = [True for _ in range(nlp["g"].shape[0])] default_opts = { "verbose": False, "print_time": False, "record_time": True, "expand": True, "structure_detection": "auto", "equality": equalities, "fatrop": { "max_iter": 100, "tol": 1e-4, # "mu_init": 1e-2, "print_level": 0, }, } default_solver_opts = options.pop("fatrop", {}) opts = {**default_opts, **options} opts["fatrop"].update(default_solver_opts) if self.do_jit: opts["expand"] = False # compiled code is better not expanded solver = self.make_casadi_nlp(nlp, "fatrop", opts, "nlp") if not self.do_jit: return solver nlp = compile_solver(bat_file=self.bat_file, optimizer=solver, name=self.name) return self.make_casadi_nlp(nlp, "fatrop", opts, "nlp") def _create_ipopt_solver(self, nlp: dict, options: dict): default_opts = { "verbose": False, "print_time": False, "record_time": True, "expand": True, "ipopt": { # "mu_init": 1e-2, "max_iter": 100, "tol": 1e-4, "acceptable_tol": 0.1, "acceptable_constr_viol_tol": 1, "acceptable_iter": 5, "acceptable_compl_inf_tol": 1, "print_level": 0, }, } ipopt_ = options.pop("ipopt", {}) opts = {**default_opts, **options} opts["ipopt"].update(ipopt_) if self.do_jit: opts["expand"] = False # compiled code is better not expanded solver = self.make_casadi_nlp(nlp, "ipopt", opts, "nlp") if not self.do_jit: return solver nlp = compile_solver(bat_file=self.bat_file, optimizer=solver, name=self.name) return self.make_casadi_nlp(nlp, "ipopt", opts, "nlp") def _create_sqpmethod_solver(self, nlp: dict, options: dict): default_opts = { "qpsol": "osqp", "qpsol_options": {"error_on_fail": False}, "print_iteration": False, "print_status": False, "print_header": False, "print_time": False, "max_iter": 20, "tol_du": 0.01, "tol_pr": 0.0001, } opts = {**default_opts, **options} return ca.nlpsol("mpc", "sqpmethod", nlp, opts) def _create_qpoases_solver(self, nlp: dict, options: dict): default_opts = { "verbose": False, "print_time": False, "record_time": True, "printLevel": "low", } opts = {**default_opts, **options} return ca.qpsol("mpc", "qpoases", nlp, opts) def _create_proxqp_solver(self, nlp: dict, options: dict): default_opts = { "verbose": False, "print_time": False, "record_time": True, "proxqp": {"max_iter": 200, "eps_abs": 1e-4, "backend": "sparse"}, } opts = {**default_opts, **options} return ca.qpsol("mpc", "proxqp", nlp, opts) def _create_osqp_solver(self, nlp: dict, options: dict): default_opts = { "verbose": False, "print_time": False, "record_time": True, "osqp": {"max_iter": 200, "eps_abs": 1e-4, "verbose": False}, } opts = {**default_opts, **options} return ca.qpsol("mpc", "osqp", nlp, opts) def _create_gurobi_solver( self, nlp: dict, options: dict, discrete: DiscreteVars = None ): default_opts = {} opts = {**default_opts, **options, "discrete": discrete} return ca.qpsol("mpc", "gurobi", nlp, opts) def _create_bonmin_solver( self, nlp: dict, options: dict, discrete: DiscreteVars = None ): default_opts = { "bonmin.bb_log_level": 0, "bonmin.bb_log_interval": 1000, "bonmin.nlp_log_level": 0, } opts = {**default_opts, **options, "discrete": discrete} return ca.nlpsol("mpc", "bonmin", nlp, opts)
[docs] def make_casadi_nlp( self, problem: Union[dict, str], solver: str, opts: dict, problem_type: Literal["nlp", "qp"] = "nlp", ): ca_sol = ca.nlpsol if problem_type == "nlp" else ca.qpsol try: solver = ca_sol("mpc", solver, problem, opts) except RuntimeError: solver = ca_sol("mpc", solver, problem, {**opts, "expand": False}) if not self.do_jit: self.logger.info( "Tried setting up nlp with 'expand'=True, but your problem " "formulation contains non-expandable elements (e.g. using cvodes " "as integrator, or interpolation tables.)" ) return solver
[docs]@contextmanager def temporary_directory(path): old_pwd = os.getcwd() os.chdir(path) try: yield finally: os.chdir(old_pwd)
[docs]def compile_solver(bat_file: Path, name: str, optimizer: ca.Function) -> str: """ Code-generates an ipopt solver and compiles it. Currently, only works on Windows! Requires a batch file that knows how to setup Visual Studio command line and compiles the source code. Returns: The Path to the .dll file for the compiled solver. Raises: TypeError FileNotFoundError RuntimeError """ if not name: name = f"nlp_{random.randint(10, 1000)}" base_name = name file_name = f"{name}.c" file = Path(file_name) i = 0 # "build_batch_bat": "D:/ses-tsp/masterarbeit-miocp/02_Work/agentlib_mpc_9_approximate_miocp/examples/prod_cons_nmpc/solver_lib/compile_nlp.bat", c_dir = Path(Path(bat_file).parent, "code_gen") c_dir.mkdir(exist_ok=True) batch = str(Path(bat_file).absolute()) with temporary_directory(c_dir): while file.exists(): name = f"{base_name}_{i}" file_name = f"{name}.c" file = Path(file_name) i = i + 1 with temporary_directory(c_dir): optimizer.generate_dependencies(file_name) try: with temporary_directory(c_dir): ret = subprocess.call([batch, file_name]) except TypeError as e: # no batch file was provided raise TypeError( "You need to provide a batch file to " "compile the solver in the backend config." ) from e except FileNotFoundError as e: # provided batch file does not exist raise FileNotFoundError from e if ret != 0: raise RuntimeError( "The provided batch file did not exit properly, please " "check it properly compiles the provided file." ) solver_dll = Path(c_dir).joinpath(f"{name}.dll").absolute() return str(solver_dll)