Source code for agentlib_mpc.optimization_backends.casadi_.casadi_ml

import casadi as ca
from typing import Dict
import collections

from agentlib_mpc.models.casadi_model import CasadiParameter

from agentlib_mpc.data_structures.casadi_utils import (
    LB_PREFIX,
    UB_PREFIX,
    DiscretizationMethod,
    SolverFactory,
    Constraint,
)
from agentlib_mpc.data_structures.ml_model_datatypes import name_with_lag
from agentlib_mpc.data_structures.mpc_datamodels import (
    FullVariableReference,
)
from agentlib_mpc.models.casadi_ml_model import CasadiMLModel
from agentlib_mpc.optimization_backends.casadi_.core.VariableGroup import (
    OptimizationQuantity,
    OptimizationVariable,
    OptimizationParameter,
)
from agentlib_mpc.optimization_backends.casadi_.basic import (
    MultipleShooting,
    CasADiBaseBackend,
)
from agentlib_mpc.optimization_backends.casadi_.full import FullSystem


[docs]class CasadiMLSystem(FullSystem): # multiple possibilities of using the MLModel # stage function for neural networks model: CasadiMLModel lags_dict: dict[str, int] sim_step: ca.Function
[docs] def initialize(self, model: CasadiMLModel, var_ref: FullVariableReference): # define variables self.states = OptimizationVariable.declare( denotation="state", variables=model.get_states(var_ref.states), ref_list=var_ref.states, assert_complete=True, ) self.controls = OptimizationVariable.declare( denotation="control", variables=model.get_inputs(var_ref.controls), ref_list=var_ref.controls, assert_complete=True, ) self.algebraics = OptimizationVariable.declare( denotation="z", variables=model.auxiliaries, ref_list=[], ) self.outputs = OptimizationVariable.declare( denotation="y", variables=model.outputs, ref_list=var_ref.outputs, ) # define parameters self.non_controlled_inputs = OptimizationParameter.declare( denotation="d", variables=model.get_inputs(var_ref.inputs), ref_list=var_ref.inputs, assert_complete=True, ) self.model_parameters = OptimizationParameter.declare( denotation="parameter", variables=model.parameters, ref_list=var_ref.parameters, ) self.initial_state = OptimizationParameter.declare( denotation="initial_state", variables=model.get_states(var_ref.states), ref_list=var_ref.states, use_in_stage_function=False, assert_complete=True, ) self.last_control = OptimizationParameter.declare( denotation="initial_control", variables=model.get_inputs(var_ref.controls), ref_list=var_ref.controls, use_in_stage_function=False, assert_complete=True, ) self.r_del_u = OptimizationParameter.declare( denotation="r_del_u", variables=[CasadiParameter(name=r_del_u) for r_del_u in var_ref.r_del_u], ref_list=var_ref.r_del_u, use_in_stage_function=False, assert_complete=True, ) self.cost_function = model.cost_func self.model_constraints = Constraint( function=ca.vertcat(*[c.function for c in model.get_constraints()]), lb=ca.vertcat(*[c.lb for c in model.get_constraints()]), ub=ca.vertcat(*[c.ub for c in model.get_constraints()]), ) self.sim_step = model.make_predict_function_for_mpc() self.model = model self.lags_dict: dict[str, int] = model.lags_dict
@property def max_lag(self) -> int: if self.lags_dict: return max(self.lags_dict.values()) else: # if there is no bb variable, we have a lag of 1 return 1
[docs] def all_system_quantities(self) -> dict[str, OptimizationQuantity]: return {var.name: var for var in self.quantities}
[docs]class MultipleShooting_ML(MultipleShooting): max_lag: int def _discretize(self, sys: CasadiMLSystem): n = self.options.prediction_horizon ts = self.options.time_step const_par = self.add_opt_par(sys.model_parameters) du_weights = self.add_opt_par(sys.r_del_u) pre_grid_states = [ts * i for i in range(-sys.max_lag + 1, 1)] inputs_lag = min(-2, -sys.max_lag) # at least -2, to consider last control pre_grid_inputs = [ts * i for i in range(inputs_lag + 1, 0)] prediction_grid = [ts * i for i in range(0, n)] # sort for debugging purposes full_grid = sorted( list(set(prediction_grid + pre_grid_inputs + pre_grid_states)) ) # dict[time, dict[denotation, ca.MX]] mx_dict: dict[float, dict[str, ca.MX]] = {time: {} for time in full_grid} # add past state variables for time in pre_grid_states: self.pred_time = time x_past = self.add_opt_par(sys.initial_state) # add past states as optimization variables with fixed values so they can # be accessed by the first few steps, when there are lags mx_dict[time][sys.states.name] = self.add_opt_var( sys.states, lb=x_past, ub=x_past, guess=x_past ) mx_dict[time][sys.initial_state.name] = x_past # add past inputs for time in pre_grid_inputs: self.pred_time = time d = sys.non_controlled_inputs mx_dict[time][d.name] = self.add_opt_par(d) u_past = self.add_opt_par(sys.last_control) mx_dict[time][sys.controls.name] = self.add_opt_var( sys.controls, lb=u_past, ub=u_past, guess=u_past ) mx_dict[time][sys.last_control.name] = u_past # add all variables over future grid for time in prediction_grid: self.pred_time = time mx_dict[time][sys.controls.name] = self.add_opt_var(sys.controls) mx_dict[time][sys.non_controlled_inputs.name] = self.add_opt_par( sys.non_controlled_inputs ) mx_dict[time][sys.algebraics.name] = self.add_opt_var(sys.algebraics) mx_dict[time][sys.outputs.name] = self.add_opt_var(sys.outputs) # create the state grid # x0 will always be the state at time 0 since the loop it is defined in starts # in the past and finishes at 0 self.pred_time = 0 for time in prediction_grid[1:]: self.pred_time = time mx_dict[time][sys.states.name] = self.add_opt_var(sys.states) self.pred_time += ts mx_dict[self.pred_time] = {sys.states.name: self.add_opt_var(sys.states)} all_quantities = sys.all_system_quantities() # add constraints and create the objective function for all stages for time in prediction_grid: stage_mx = mx_dict[time] # add penalty on control change between intervals u_prev = mx_dict[time - ts][sys.controls.name] uk = stage_mx[sys.controls.name] self.objective_function += ts * ca.dot(du_weights, (u_prev - uk) ** 2) # get stage arguments from current time step stage_arguments = { # variables sys.states.name: stage_mx[sys.states.name], sys.algebraics.name: stage_mx[sys.algebraics.name], sys.outputs.name: stage_mx[sys.outputs.name], # parameters sys.controls.name: stage_mx[sys.controls.name], sys.non_controlled_inputs.name: stage_mx[ sys.non_controlled_inputs.name ], sys.model_parameters.name: const_par, } # collect stage arguments for lagged variables for lag, denotation_dict in self._lagged_input_names.items(): for denotation, var_names in denotation_dict.items(): l_name = name_with_lag(denotation, lag) mx_list = [] for v_name in var_names: # add only the singular variable which has a lag on this level # to the stage arguments index = all_quantities[denotation].full_names.index(v_name) mx_list.append(mx_dict[time - lag * ts][denotation][index]) stage_arguments[l_name] = ca.vertcat(*mx_list) # evaluate a stage, add path constraints, multiple shooting constraints # and add to the objective function stage_result = self._stage_function(**stage_arguments) self.add_constraint( stage_result["model_constraints"], lb=stage_result["lb_model_constraints"], ub=stage_result["ub_model_constraints"], ) self.add_constraint( stage_result["next_states"] - mx_dict[time + ts][sys.states.name] ) self.objective_function += stage_result["cost_function"] * ts
[docs] def initialize(self, system: CasadiMLSystem, solver_factory: SolverFactory): """Initializes the trajectory optimization problem, creating all symbolic variables of the OCP, the mapping function and the numerical solver.""" self._construct_stage_function(system) super().initialize(system=system, solver_factory=solver_factory)
def _construct_stage_function(self, system: CasadiMLSystem): """ Combine information from the model and the var_ref to create CasADi functions which describe the system dynamics and constraints at each stage of the optimization problem. Sets the stage function. It has all mpc variables as inputs, sorted by denotation (declared in self.declare_quantities) and outputs ode, cost function and 3 outputs per constraint (constraint, lb_constraint, ub_constraint). In the basic case, it has the form: CasadiFunction: ['x', 'z', 'u', 'y', 'd', 'p'] -> ['ode', 'cost_function', 'model_constraints', 'ub_model_constraints', 'lb_model_constraints'] Args: system """ all_system_quantities = system.all_system_quantities() constraints = {"model_constraints": system.model_constraints} inputs = [ q.full_symbolic for q in all_system_quantities.values() if q.use_in_stage_function ] input_denotations = [ q.name for denotation, q in all_system_quantities.items() if q.use_in_stage_function ] # aggregate constraints constraints_func = [c.function for c in constraints.values()] constraints_lb = [c.lb for c in constraints.values()] constraints_ub = [c.ub for c in constraints.values()] constraint_denotations = list(constraints.keys()) constraint_lb_denotations = [LB_PREFIX + k for k in constraints] constraint_ub_denotations = [UB_PREFIX + k for k in constraints] # create a dictionary which holds all the inputs for the sim step of the model all_input_variables = {} lagged_inputs: dict[int, dict[str, ca.MX]] = {} # dict[lag, dict[denotation, list[var_name]]] lagged_input_names: dict[int, dict[str, list[str]]] = {} for q_name, q_obj in all_system_quantities.items(): if not q_obj.use_in_stage_function: continue for v_id, v_name in enumerate(q_obj.full_names): all_input_variables[v_name] = q_obj.full_symbolic[v_id] lag = system.lags_dict.get(v_name, 1) # if lag exists, we have to create and organize new variables for j in range(1, lag): # create an MX variable for this lag l_name = name_with_lag(v_name, j) new_lag_var = system.model.lags_mx_store[l_name] all_input_variables[l_name] = new_lag_var # add the mx variable to its lag time and denotation lagged_inputs_j = lagged_inputs.setdefault(j, {}) lv_mx = lagged_inputs_j.setdefault(q_name, ca.DM([])) lagged_inputs[j][q_name] = ca.vertcat(lv_mx, new_lag_var) # keep track of the variable names that were added lagged_input_names_j = lagged_input_names.setdefault(j, {}) lv_names = lagged_input_names_j.setdefault(q_name, []) lv_names.append(v_name) self._lagged_input_names = lagged_input_names flat_lagged_inputs = { f"{den}_{i}": mx for i, subdict in lagged_inputs.items() for den, mx in subdict.items() } all_outputs = system.sim_step(**all_input_variables) state_output_it = (all_outputs[s_name] for s_name in system.states.full_names) state_output = ca.vertcat(*state_output_it) # aggregate outputs outputs = [ state_output, system.cost_function, *constraints_func, *constraints_lb, *constraints_ub, ] output_denotations = [ "next_states", "cost_function", *constraint_denotations, *constraint_lb_denotations, *constraint_ub_denotations, ] # function describing system dynamics and cost function self._stage_function = ca.Function( "f", inputs + list(flat_lagged_inputs.values()), outputs, # input handles to make kwarg use possible and to debug input_denotations + list(flat_lagged_inputs), # output handles to make kwarg use possible and to debug output_denotations, ) def _create_lag_structure_for_denotations(self, system: CasadiMLSystem): all_system_quantities = self.all_system_quantities(system) all_input_variables = {} lagged_inputs: dict[int, dict[str, ca.MX]] = {} # dict[lag, dict[denotation, list[var_name]]] lagged_input_names: dict[int, dict[str, list[str]]] = {} for q_name, q_obj in all_system_quantities.items(): if not q_obj.use_in_stage_function: continue for v_id, v_name in enumerate(q_obj.full_names): all_input_variables[v_name] = q_obj.full_symbolic[v_id] lag = system.lags_dict.get(v_name, 1) # if lag exists, we have to create and organize new variables for j in range(1, lag): # create an MX variable for this lag l_name = name_with_lag(v_name, j) new_lag_var = ca.MX.sym(l_name) all_input_variables[l_name] = new_lag_var # add the mx variable to its lag time and denotation lagged_inputs_j = lagged_inputs.setdefault(j, {}) lv_mx = lagged_inputs_j.setdefault(q_name, ca.DM([])) lagged_inputs[j][q_name] = ca.vertcat(lv_mx, new_lag_var) # keep track of the variable names that were added lagged_input_names_j = lagged_input_names.setdefault(j, {}) lv_names = lagged_input_names_j.setdefault(q_name, []) lv_names.append(v_name) return
[docs]class CasADiBBBackend(CasADiBaseBackend): """ Class doing optimization with a MLModel. """ system_type = CasadiMLSystem discretization_types = {DiscretizationMethod.multiple_shooting: MultipleShooting_ML} system: CasadiMLSystem # a dictionary of collections of the variable lags lag_collection: Dict[str, collections.deque] = {} max_lag: int
[docs] def get_lags_per_variable(self) -> dict[str, float]: """Returns the name of variables which include lags and their lag. The MPC module can use this information to save relevant past data of lagged variables""" ts = self.config.discretization_options.time_step return { name: (lag - 1) * ts for name, lag in self.system.lags_dict.items() if name in self.var_ref }