Source code for agentlib_mpc.optimization_backends.casadi_.casadi_admm_ml

import logging
from typing import Union

import casadi as ca

from agentlib_mpc.models.casadi_model import CasadiInput, CasadiParameter
from agentlib_mpc.data_structures.casadi_utils import (
    LB_PREFIX,
    UB_PREFIX,
    DiscretizationMethod,
    Constraint,
)
from agentlib_mpc.data_structures.ml_model_datatypes import name_with_lag
from agentlib_mpc.models.casadi_ml_model import CasadiMLModel
from agentlib_mpc.optimization_backends.casadi_.casadi_ml import (
    CasadiMLSystem,
    CasADiBBBackend,
    MultipleShooting_ML,
)
from agentlib_mpc.optimization_backends.casadi_.core.VariableGroup import (
    OptimizationVariable,
    OptimizationParameter,
)
from agentlib_mpc.data_structures import admm_datatypes
from agentlib_mpc.optimization_backends.casadi_.admm import (
    ADMMMultipleShooting,
    CasadiADMMSystem,
    CasADiADMMBackend,
)

logger = logging.getLogger(__name__)


[docs]class CasadiADMMNNSystem(CasadiADMMSystem, CasadiMLSystem): """ In this class, the lags are determined by the trainer alone and the lags are saved in the serialized MLModel so that it doesn't have to be defined in the model again """ past_couplings: OptimizationParameter past_exchange: OptimizationParameter
[docs] def initialize( self, model: CasadiMLModel, var_ref: admm_datatypes.VariableReference ): # define variables self.states = OptimizationVariable.declare( denotation="state", variables=model.get_states(var_ref.states), ref_list=var_ref.states, assert_complete=True, ) self.controls = OptimizationVariable.declare( denotation="control", variables=model.get_inputs(var_ref.controls), ref_list=var_ref.controls, assert_complete=True, ) self.algebraics = OptimizationVariable.declare( denotation="z", variables=model.auxiliaries, ref_list=[], ) self.outputs = OptimizationVariable.declare( denotation="y", variables=model.outputs, ref_list=var_ref.outputs, ) # define parameters self.non_controlled_inputs = OptimizationParameter.declare( denotation="d", variables=model.get_inputs(var_ref.inputs), ref_list=var_ref.inputs, assert_complete=True, ) self.model_parameters = OptimizationParameter.declare( denotation="parameter", variables=model.parameters, ref_list=var_ref.parameters, ) self.initial_state = OptimizationParameter.declare( denotation="initial_state", # append the 0 as a convention to get initial guess variables=model.get_states(var_ref.states), ref_list=var_ref.states, use_in_stage_function=False, assert_complete=True, ) self.last_control = OptimizationParameter.declare( denotation="initial_control", # append the 0 as a convention to get initial guess variables=model.get_inputs(var_ref.controls), ref_list=var_ref.controls, use_in_stage_function=False, assert_complete=True, ) self.r_del_u = OptimizationParameter.declare( denotation="r_del_u", variables=[CasadiParameter(name=r_del_u) for r_del_u in var_ref.r_del_u], ref_list=var_ref.r_del_u, use_in_stage_function=False, assert_complete=True, ) self.cost_function = model.cost_func self.model_constraints = Constraint( function=ca.vertcat(*[c.function for c in model.get_constraints()]), lb=ca.vertcat(*[c.lb for c in model.get_constraints()]), ub=ca.vertcat(*[c.ub for c in model.get_constraints()]), ) self.sim_step = model.make_predict_function_for_mpc() self.model = model self.lags_dict: dict[str, int] = model.lags_dict coup_names = [c.name for c in var_ref.couplings] exchange_names = [c.name for c in var_ref.exchange] pure_outs = [ m for m in model.outputs if m.name not in coup_names + exchange_names ] self.outputs = OptimizationVariable.declare( denotation="y", variables=pure_outs, ref_list=var_ref.outputs, ) self.local_couplings = OptimizationVariable.declare( denotation="local_couplings", variables=[model.get(name) for name in coup_names], ref_list=coup_names, ) couplings_global = [coup.mean for coup in var_ref.couplings] self.global_couplings = OptimizationParameter.declare( denotation="global_couplings", variables=[CasadiInput(name=coup) for coup in couplings_global], ref_list=couplings_global, ) multipliers = [coup.multiplier for coup in var_ref.couplings] self.multipliers = OptimizationParameter.declare( denotation="multipliers", variables=[CasadiInput(name=coup) for coup in multipliers], ref_list=multipliers, ) self.local_exchange = OptimizationVariable.declare( denotation="local_exchange", variables=[model.get(name) for name in exchange_names], ref_list=exchange_names, ) couplings_mean_diff = [coup.mean_diff for coup in var_ref.exchange] self.exchange_diff = OptimizationParameter.declare( denotation="average_diff", variables=[CasadiInput(name=coup) for coup in couplings_mean_diff], ref_list=couplings_mean_diff, ) multipliers = [coup.multiplier for coup in var_ref.exchange] self.exchange_multipliers = OptimizationParameter.declare( denotation="exchange_multipliers", variables=[CasadiInput(name=coup) for coup in multipliers], ref_list=multipliers, ) self.penalty_factor = OptimizationParameter.declare( denotation="rho", variables=[CasadiParameter(name="penalty_factor")], ref_list=["penalty_factor"], ) past_coup_names = [coup.lagged for coup in var_ref.couplings] self.past_couplings = OptimizationParameter.declare( denotation="past_couplings", variables=[CasadiInput(name=name) for name in past_coup_names], ref_list=past_coup_names, use_in_stage_function=False, ) past_exchange_names = [exchange.lagged for exchange in var_ref.exchange] self.past_exchange = OptimizationParameter.declare( denotation="past_exchange", variables=[CasadiInput(name=name) for name in exchange_names], ref_list=past_exchange_names, use_in_stage_function=False, ) # add admm terms to objective function admm_objective = 0 rho = self.penalty_factor.full_symbolic[0] for i in range(len(var_ref.couplings)): admm_in = self.global_couplings.full_symbolic[i] admm_out = self.local_couplings.full_symbolic[i] admm_lam = self.multipliers.full_symbolic[i] admm_objective += admm_lam * admm_out + rho / 2 * (admm_in - admm_out) ** 2 for i in range(len(var_ref.exchange)): admm_in = self.exchange_diff.full_symbolic[i] admm_out = self.local_exchange.full_symbolic[i] admm_lam = self.exchange_multipliers.full_symbolic[i] admm_objective += admm_lam * admm_out + rho / 2 * (admm_in - admm_out) ** 2 self.cost_function += admm_objective
@property def variables(self) -> list[OptimizationVariable]: return [ var for var in self.__dict__.values() if isinstance(var, OptimizationVariable) ] @property def parameters(self) -> list[OptimizationParameter]: return [ var for var in self.__dict__.values() if isinstance(var, OptimizationParameter) ] @property def quantities(self) -> list[Union[OptimizationParameter, OptimizationVariable]]: return self.variables + self.parameters @property def sim_step_quantities( self, ) -> dict[str, Union[OptimizationParameter, OptimizationVariable]]: omit_in_blackbox_function = { "global_couplings", "multipliers", "average_diff", "exchange_multipliers", "rho", } return { var.name: var for var in self.quantities if not var.name in omit_in_blackbox_function }
[docs]class MultipleShootingADMMNN(ADMMMultipleShooting, MultipleShooting_ML): max_lag: int def _discretize(self, sys: CasadiADMMNNSystem): n = self.options.prediction_horizon ts = self.options.time_step # Parameters that are constant over the horizon const_par = self.add_opt_par(sys.model_parameters) rho = self.add_opt_par(sys.penalty_factor) du_weights = self.add_opt_par(sys.r_del_u) pre_grid_states = [ts * i for i in range(-sys.max_lag + 1, 1)] inputs_lag = min(-2, -sys.max_lag) # at least -2, to consider last control pre_grid_inputs = [ts * i for i in range(inputs_lag + 1, 0)] prediction_grid = [ts * i for i in range(0, n)] # sort for debugging purposes full_grid = sorted( list(set(prediction_grid + pre_grid_inputs + pre_grid_states)) ) # dict[time, dict[denotation, ca.MX]] mx_dict: dict[float, dict[str, ca.MX]] = {time: {} for time in full_grid} # add past state variables for time in pre_grid_states: self.pred_time = time x_past = self.add_opt_par(sys.initial_state) # add past states as optimization variables with fixed values so they can # be accessed by the first few steps, when there are lags mx_dict[time][sys.states.name] = self.add_opt_var( sys.states, lb=x_past, ub=x_past, guess=x_past ) mx_dict[time][sys.initial_state.name] = x_past # add past inputs for time in pre_grid_inputs: self.pred_time = time d = sys.non_controlled_inputs mx_dict[time][d.name] = self.add_opt_par(d) u_past = self.add_opt_par(sys.last_control) mx_dict[time][sys.controls.name] = self.add_opt_var( sys.controls, lb=u_past, ub=u_past, guess=u_past ) mx_dict[time][sys.last_control.name] = u_past # admm quantities past_coup = self.add_opt_par(sys.past_couplings) past_exch = self.add_opt_par(sys.past_exchange) mx_dict[time][sys.local_couplings.name] = past_coup mx_dict[time][sys.local_exchange.name] = past_exch mx_dict[time][sys.local_couplings.name] = self.add_opt_var( sys.local_couplings, lb=past_coup, ub=past_coup, guess=past_coup ) mx_dict[time][sys.local_exchange.name] = self.add_opt_var( sys.local_exchange, lb=past_exch, ub=past_exch, guess=past_exch ) # add all variables over future grid for time in prediction_grid: self.pred_time = time mx_dict[time][sys.controls.name] = self.add_opt_var(sys.controls) mx_dict[time][sys.non_controlled_inputs.name] = self.add_opt_par( sys.non_controlled_inputs ) mx_dict[time][sys.algebraics.name] = self.add_opt_var(sys.algebraics) mx_dict[time][sys.outputs.name] = self.add_opt_var(sys.outputs) # admm related quantities mx_dict[time][sys.multipliers.name] = self.add_opt_par(sys.multipliers) mx_dict[time][sys.exchange_multipliers.name] = self.add_opt_par( sys.exchange_multipliers ) mx_dict[time][sys.exchange_diff.name] = self.add_opt_par(sys.exchange_diff) mx_dict[time][sys.global_couplings.name] = self.add_opt_par( sys.global_couplings ) mx_dict[time][sys.local_exchange.name] = self.add_opt_var( sys.local_exchange ) mx_dict[time][sys.local_couplings.name] = self.add_opt_var( sys.local_couplings ) # create the state grid # x0 will always be the state at time 0 since the loop it is defined in starts # in the past and finishes at 0 self.pred_time = 0 for time in prediction_grid[1:]: self.pred_time = time mx_dict[time][sys.states.name] = self.add_opt_var(sys.states) self.pred_time += ts mx_dict[self.pred_time] = {sys.states.name: self.add_opt_var(sys.states)} all_quantities = sys.all_system_quantities() # add constraints and create the objective function for all stages for time in prediction_grid: stage_mx = mx_dict[time] # add penalty on control change between intervals u_prev = mx_dict[time - ts][sys.controls.name] uk = stage_mx[sys.controls.name] self.objective_function += ts * ca.dot(du_weights, (u_prev - uk) ** 2) # get stage arguments from current time step stage_arguments = { # variables sys.states.name: stage_mx[sys.states.name], sys.algebraics.name: stage_mx[sys.algebraics.name], sys.outputs.name: stage_mx[sys.outputs.name], # parameters sys.controls.name: stage_mx[sys.controls.name], sys.non_controlled_inputs.name: stage_mx[ sys.non_controlled_inputs.name ], sys.model_parameters.name: const_par, sys.penalty_factor.name: rho, # admm related quantities sys.multipliers.name: stage_mx[sys.multipliers.name], sys.exchange_multipliers.name: stage_mx[sys.exchange_multipliers.name], sys.exchange_diff.name: stage_mx[sys.exchange_diff.name], sys.global_couplings.name: stage_mx[sys.global_couplings.name], sys.local_exchange.name: stage_mx[sys.local_exchange.name], sys.local_couplings.name: stage_mx[sys.local_couplings.name], } # collect stage arguments for lagged variables for lag, denotation_dict in self._lagged_input_names.items(): for denotation, var_names in denotation_dict.items(): l_name = name_with_lag(denotation, lag) mx_list = [] for v_name in var_names: index = all_quantities[denotation].full_names.index(v_name) mx_list.append(mx_dict[time - lag * ts][denotation][index]) stage_arguments[l_name] = ca.vertcat(*mx_list) # evaluate a stage, add path constraints, multiple shooting constraints # and add to the objective function stage_result = self._stage_function(**stage_arguments) self.add_constraint( stage_result["model_constraints"], lb=stage_result["lb_model_constraints"], ub=stage_result["ub_model_constraints"], ) self.add_constraint( stage_result["next_states"] - mx_dict[time + ts][sys.states.name] ) self.objective_function += stage_result["cost_function"] * ts def _construct_stage_function(self, system: CasadiADMMNNSystem): """ Combine information from the model and the var_ref to create CasADi functions which describe the system dynamics and constraints at each stage of the optimization problem. Sets the stage function. It has all mpc variables as inputs, sorted by denotation (declared in self.declare_quantities) and outputs ode, cost function and 3 outputs per constraint (constraint, lb_constraint, ub_constraint). In the basic case, it has the form: CasadiFunction: ['x', 'z', 'u', 'y', 'd', 'p'] -> ['ode', 'cost_function', 'model_constraints', 'ub_model_constraints', 'lb_model_constraints'] Args: system """ all_system_quantities = system.all_system_quantities() constraints = {"model_constraints": system.model_constraints} inputs = [ q.full_symbolic for q in all_system_quantities.values() if q.use_in_stage_function ] input_denotations = [ q.name for denotation, q in all_system_quantities.items() if q.use_in_stage_function ] # aggregate constraints constraints_func = [c.function for c in constraints.values()] constraints_lb = [c.lb for c in constraints.values()] constraints_ub = [c.ub for c in constraints.values()] constraint_denotations = list(constraints.keys()) constraint_lb_denotations = [LB_PREFIX + k for k in constraints] constraint_ub_denotations = [UB_PREFIX + k for k in constraints] # create a dictionary which holds all the inputs for the sim step of the model all_input_variables = {} lagged_inputs: dict[int, dict[str, ca.MX]] = {} # dict[lag, dict[denotation, list[var_name]]] lagged_input_names: dict[int, dict[str, list[str]]] = {} for q_name, quantity in system.sim_step_quantities.items(): if not quantity.use_in_stage_function: continue for v_id, v_name in enumerate(quantity.full_names): all_input_variables[v_name] = quantity.full_symbolic[v_id] lag = system.lags_dict.get(v_name, 1) # if lag exists, we have to create and organize new variables for j in range(1, lag): # create an MX variable for this lag l_name = name_with_lag(v_name, j) new_lag_var = ca.MX.sym(l_name) all_input_variables[l_name] = new_lag_var # add the mx variable to its lag time and denotation lagged_inputs_j = lagged_inputs.setdefault(j, {}) lv_mx = lagged_inputs_j.setdefault(q_name, ca.DM([])) lagged_inputs[j][q_name] = ca.vertcat(lv_mx, new_lag_var) # keep track of the variable names that were added lagged_input_names_j = lagged_input_names.setdefault(j, {}) lv_names = lagged_input_names_j.setdefault(q_name, []) lv_names.append(v_name) self._lagged_input_names = lagged_input_names flat_lagged_inputs = { f"{den}_{i}": mx for i, subdict in lagged_inputs.items() for den, mx in subdict.items() } all_outputs = system.sim_step(**all_input_variables) state_output_it = (all_outputs[s_name] for s_name in system.states.full_names) state_output = ca.vertcat(*state_output_it) # aggregate outputs outputs = [ state_output, system.cost_function, *constraints_func, *constraints_lb, *constraints_ub, ] output_denotations = [ "next_states", "cost_function", *constraint_denotations, *constraint_lb_denotations, *constraint_ub_denotations, ] # function describing system dynamics and cost function self._stage_function = ca.Function( "f", inputs + list(flat_lagged_inputs.values()), outputs, # input handles to make kwarg use possible and to debug input_denotations + list(flat_lagged_inputs), # output handles to make kwarg use possible and to debug output_denotations, )
[docs]class CasADiADMMBackend_NN(CasADiADMMBackend, CasADiBBBackend): """ Class doing optimization with an MLModel. """ system_type = CasadiADMMNNSystem discretization_types = { DiscretizationMethod.multiple_shooting: MultipleShootingADMMNN } system: CasadiADMMNNSystem
# a dictionary of collections of the variable lags