"""Holds the classes for CasADi variables and the CasADi model."""
import json
import logging
import abc
from itertools import chain
from typing import List, Union, Tuple, Optional
import attrs
import pandas as pd
from pydantic import Field, PrivateAttr, ConfigDict
import casadi as ca
import numpy as np
from agentlib.core import Model, ModelConfig
from agentlib.core.datamodels import (
from agentlib_mpc.data_structures.casadi_utils import ModelConstraint
CasadiTypes = Union[ca.MX, ca.SX, ca.DM, ca.Sparsity]
logger = logging.getLogger(__name__)
ca_func_inputs = Union[ca.MX, ca.SX, ca.Sparsity, ca.DM]
ca_all_inputs = Union[ca_func_inputs, np.float64, float]
ca_constraint = Tuple[ca_all_inputs, ca_func_inputs, ca_all_inputs]
ca_constraints = List[Tuple[ca_all_inputs, ca_func_inputs, ca_all_inputs]]
[docs]@attrs.define(slots=True, weakref_slot=False, kw_only=True)
class CasadiVariable(ModelVariable):
"""Base Class for variables used in Casadi Models for simulation and
optimization. Implements the standard arithmetic operations,
so CasadiVariables can be used in equations.
sym: The symbolic CasADi variable used to define ode's and
optimization problems.
_sym: CasadiTypes = attrs.field(default=None, alias="_sym")
def __attrs_post_init__(self):
self._sym = self.create_sym()
[docs] def create_sym(self) -> ca.MX:
"""Ensures a symbolic MX variable is created with each CasadiVariable
instance, and that its dimensions are consistent."""
if self.value is not None:
if isinstance(self.value, (float, int)):
shape = (1, 1)
shape = np.array(self.value).shape
if len(shape) == 1:
shape = (shape[0], 1)
shape = (1, 1)
sym = ca.MX.sym(self.name, shape[0], shape[1])
return sym
def sym(self) -> ca.MX:
return self._sym
def __add__(self, other):
return self._sym + other
def __radd__(self, other):
return other + self._sym
def __sub__(self, other):
return self._sym - other
def __rsub__(self, other):
return other - self._sym
def __mul__(self, other):
return self._sym * other
def __rmul__(self, other):
return other * self._sym
def __truediv__(self, other):
return self._sym / other
def __rtruediv__(self, other):
return other / self._sym
def __pow__(self, power, modulo=None):
return self._sym**power
def __rpow__(self, other):
return other**self._sym
def __abs__(self):
return ca.fabs(self._sym)
def __matmul__(self, other):
return self._sym @ other.sym
def __neg__(self):
return -self._sym
def __eq__(self, other):
return self.sym == other.sym
except AttributeError:
return False
def __le__(self, other):
return self.sym <= other.sym
except AttributeError as e:
raise TypeError(
"Cannot compare a CasadiVariable to a Non-CasadiVariable"
) from e
def __lt__(self, other):
return self.sym < other.sym
except AttributeError as e:
raise TypeError(
"Cannot compare a CasadiVariable to a Non-CasadiVariable"
) from e
def __ne__(self, other):
return self.sym != other.sym
except AttributeError:
return True
def __ge__(self, other):
return self.sym >= other.sym
except AttributeError as e:
raise TypeError(
"Cannot compare a CasadiVariable to a Non-CasadiVariable"
) from e
def __gt__(self, other):
return self.sym > other.sym
except AttributeError as e:
raise TypeError(
"Cannot compare a CasadiVariable to a Non-CasadiVariable"
) from e
[docs]@attrs.define(slots=True, weakref_slot=False, kw_only=True)
class CasadiParameter(CasadiVariable):
Class that stores various attributes of parameters.
def __attrs_post_init__(self):
self.causality: Causality = Causality.parameter
self.variability: Variability = Variability.tunable
[docs]@attrs.define(slots=True, weakref_slot=False, kw_only=True)
class CasadiState(CasadiVariable):
Class that stores various attributes of CasADi differential variables.
_ode: Optional[CasadiTypes] = attrs.field(default=None, alias="_ode")
def __attrs_post_init__(self):
self.causality: Causality = Causality.local
self.variability: Variability = Variability.continuous
def alg(self) -> CasadiTypes:
raise AttributeError(
"Casadi States should not have .alg assignments. If you wish to provide "
"algebraic relationships to states, add them in the constraints."
return -1
def alg(self, equation: Union[CasadiTypes, CasadiVariable]):
raise AttributeError(
"Casadi States should not have .alg assignments. Consider the following: \n"
" 1. If you need equality constraints in your MPC, please add them in the "
"constraints. \n"
" 2. If you use this to bundle an expression, consider using a regular "
"Python variable. \n"
" 3. Implicit algebraic equations are currently not supported."
def ode(self) -> CasadiTypes:
return self._ode
def ode(self, equation: Union[CasadiTypes, CasadiVariable]):
self._ode = get_symbolic(equation)
[docs] def json(self, indent: int = 2, **kwargs):
data = self.dict(**kwargs)
if isinstance(self.value, pd.Series):
data["value"] = self.value.to_dict()
json.dumps(data, indent=indent)
[docs]@attrs.define(slots=True, weakref_slot=False, kw_only=True)
class CasadiOutput(CasadiVariable):
Class that stores various attributes of control variables.
_alg: CasadiTypes = attrs.field(default=None, alias="_alg")
def __attrs_post_init__(self):
self.causality: Causality = Causality.output
self.variability: Variability = Variability.continuous
def alg(self) -> CasadiTypes:
return self._alg
def alg(self, equation: Union[CasadiTypes, CasadiVariable]):
if isinstance(equation, CasadiVariable):
# Converts CasadiVariables to their symbolic variable. Useful in case
# CasadiVariables are assigned in equations as is, i.e. their math methods
# are not called.
self._alg = equation.sym
self._alg = equation
[docs] def json(self, **kwargs):
data = self.dict(**kwargs)
if isinstance(self.value, pd.Series):
data["value"] = self.value.to_dict()
[docs]class CasadiModelConfig(ModelConfig):
system: CasadiTypes = None
cost_function: CasadiTypes = None
inputs: List[CasadiInput] = Field(default=list())
outputs: List[CasadiOutput] = Field(default=list())
states: List[CasadiState] = Field(default=list())
parameters: List[CasadiParameter] = Field(default=list())
model_config = ConfigDict(validate_assignment=True, extra="forbid")
_types: dict[str, type] = PrivateAttr(
"inputs": CasadiInput,
"outputs": CasadiOutput,
"states": CasadiState,
"parameters": CasadiParameter,
[docs]class CasadiModel(Model):
"""Base Class for CasADi models. To implement your own model, inherit
from this class, specify the variables (inputs, outputs, states,
parameters and override the setup_system() method."""
config: CasadiModelConfig
def __init__(self, **kwargs):
# Initializes the config
self.constraints = [] # constraint functions
# read constraints, assign ode's and return cost function
self.cost_func = self.setup_system()
# save system equations as a single casadi vector
system = ca.vertcat(*[sta.ode for sta in self.differentials])
# prevents errors in case system is empty
self.system = ca.reshape(system, system.shape[0], 1)
self.integrator = None # set in intitialize
def _assert_outputs_are_defined(self):
"""Raises an Error, if the output variables are not defined with an equation"""
for out in self.outputs:
if out.alg is None:
raise ValueError(
f"Output '{out.name}' was not initialized with an equation. Make "
f"sure you specify '{out.name}.alg' in 'setup_system()'."
[docs] def do_step(self, *, t_start, t_sample=None):
if t_sample is None:
t_sample = self.dt
pars = self.get_input_values()
t_sim = 0
if self.differentials:
x0 = self.get_differential_values()
curr_x = x0
while t_sim < t_sample:
result = self.integrator(x0=curr_x, p=pars)
t_sim += self.dt
curr_x = result["xf"]
result = self.integrator(p=pars)
if self.outputs:
def _make_integrator(self) -> ca.Function:
"""Creates the integrator to be used in do_step(). The integrator takes the
current state and input values as input and returns the state values and
algebraic values at the end of the interval."""
opts = {"t0": 0, "tf": self.dt}
par = ca.vertcat(
*[inp.sym for inp in chain.from_iterable([self.inputs, self.parameters])]
x = ca.vertcat(*[sta.sym for sta in self.differentials])
z = ca.vertcat(*[var.sym for var in self.outputs])
algebraic_equations = ca.vertcat(*self.output_equations)
if not algebraic_equations.shape[0] and self.differentials:
# case of pure ode
ode = {"x": x, "p": par, "ode": self.system}
integrator = ca.integrator("system", "cvodes", ode, opts)
elif algebraic_equations.shape[0] and self.differentials:
# mixed dae
dae = {
"x": x,
"p": par,
"ode": self.system,
"z": z,
"alg": algebraic_equations,
integrator = ca.integrator("system", "idas", dae, opts)
# only algebraic equations
dae = {
"x": ca.MX.sym("dummy", 1),
"p": par,
"ode": 0,
"z": z,
"alg": algebraic_equations,
integrator_ = ca.integrator("system", "idas", dae, opts)
integrator = ca.Function(
"system", [par], [integrator_(x0=0, p=par)["zf"]], ["p"], ["zf"]
return integrator
[docs] def initialize(self, **ignored):
Initializes Casadi model. Creates the integrator to be used in
do_step(). The integrator takes the current state and input values as
input and returns the state values at the end of the interval and the
value of the cost function integrated over the interval.
self.integrator = self._make_integrator()
[docs] def get_constraints(self) -> List[ModelConstraint]:
"""List of constraints of the form (lower, function, upper)."""
base_constraints = [
ModelConstraint(lb * 1, func * 1, ub * 1)
for lb, func, ub in self.constraints
equality_constraints = [
ModelConstraint(0, alg, 0) for alg in self.output_equations
return base_constraints + equality_constraints
def inputs(self) -> list[CasadiInput]:
"""Get all model inputs as a list"""
return list(self._inputs.values())
def outputs(self) -> list[CasadiOutput]:
"""Get all model outputs as a list"""
return list(self._outputs.values())
def states(self) -> list[CasadiState]:
"""Get all model states as a list"""
return list(self._states.values())
def parameters(self) -> list[CasadiParameter]:
"""Get all model parameters as a list"""
return list(self._parameters.values())
def output_equations(self) -> List[CasadiTypes]:
"""List of algebraic equations RHS in the form
0 = z - g(x, z, p, ... )"""
return [alg_var - alg_var.alg for alg_var in self.outputs]
def differentials(self) -> List[CasadiState]:
"""List of all CasadiStates with an associated differential equation."""
return [var for var in self.states if var.ode is not None]
def auxiliaries(self) -> List[CasadiState]:
"""List of all CasadiStates without an associated equation. Common
uses for this are slack variables that appear in cost functions and
constraints of optimization models."""
return [var for var in self.states if var.ode is None]
[docs] @abc.abstractmethod
def setup_system(self):
raise NotImplementedError(
"The ode is defined by the actual models " "inheriting from this class."
[docs] def get_differential_values(self):
return ca.vertcat(*[sta.value for sta in self.differentials])
[docs] def set_differential_values(self, values: Union[List, np.ndarray]):
"""Sets the values for all differential variables. Provided values list MUST
match the order in which differentials are saved, there is no check."""
for state, value in zip(self.differentials, values):
self._states[state.name].value = value
[docs] def set_output_values(self, values: Union[List, np.ndarray]):
"""Sets the values for all outputs. Provided values list MUST match the order
in which outputs are saved, there is no check."""
for var, value in zip(self.outputs, values):
self._outputs[var.name].value = value
[docs] def get(self, name: str) -> CasadiVariable:
return super().get(name)
def __setattr__(self, key, value):
super().__setattr__(key, value)
# todo
[docs]def get_symbolic(equation):
if isinstance(equation, CasadiVariable):
# Converts CasadiVariables to their symbolic variable. Useful in case
# CasadiVariables are assigned in equations as is, i.e. their math methods
# are not called.
return equation.sym
return equation