import abc
import casadi as ca
import numpy as np
from enum import Enum
from keras import layers
from typing import Union, TYPE_CHECKING
from agentlib_mpc.models.serialized_ml_model import (
SerializedMLModel,
SerializedLinReg,
SerializedGPR,
SerializedANN,
MLModels,
)
if TYPE_CHECKING:
from keras import Sequential
from agentlib_mpc.models.serialized_ml_model import CustomGPR
from sklearn.linear_model import LinearRegression
[docs]class CasadiPredictor(abc.ABC):
"""
Protocol for generic Casadi implementation of various ML-Model-based predictors.
Attributes:
serialized_model: Serialized model which will be translated to a casadi model.
predictor_model: Predictor model from other libraries, which are translated to
casadi syntax.
sym_input: Symbolical input of predictor. Has the necessary shape of the input.
prediction_function: Symbolical casadi prediction function of the given model.
"""
[docs] class Config:
arbitrary_types_allowed = True
def __init__(self, serialized_model: SerializedMLModel) -> None:
"""Initialize Predictor class."""
self.serialized_model: SerializedMLModel = serialized_model
self.predictor_model: Union[Sequential, CustomGPR, LinearRegression] = (
serialized_model.deserialize()
)
self.sym_input: ca.MX = self._get_sym_input()
self.prediction_function: ca.Function = self._build_prediction_function()
[docs] @classmethod
def from_serialized_model(cls, serialized_model: SerializedMLModel):
"""Initialize sub predictor class."""
model_type = serialized_model.model_type
# todo return type[cls]
return casadi_predictors[model_type](serialized_model)
@property
@abc.abstractmethod
def input_shape(self) -> tuple[int, int]:
"""Input shape of Predictor."""
pass
@property
def output_shape(self) -> tuple[int, int]:
"""Output shape of Predictor."""
return 1, len(self.serialized_model.output)
def _get_sym_input(self):
"""Returns symbolical input object in the required shape."""
return ca.MX.sym("input", 1, self.input_shape[1])
@abc.abstractmethod
def _build_prediction_function(self) -> ca.Function:
"""Build the prediction function with casadi and a symbolic input."""
pass
[docs] def predict(self, x: Union[np.ndarray, ca.MX]) -> Union[ca.DM, ca.MX]:
"""
Evaluate prediction function with input data.
Args:
x: input data.
Returns:
results of evaluation of prediction function with input data.
"""
return self.prediction_function(x)
[docs]class CasadiLinReg(CasadiPredictor):
"""
Generic Casadi implementation of scikit-learn LinerRegression.
"""
def __init__(self, serialized_model: SerializedLinReg) -> None:
"""
Initializes CasadiLinReg predictor.
Args:
serialized_model: SerializedLinReg object.
"""
super().__init__(serialized_model)
@property
def input_shape(self) -> tuple[int, int]:
"""Input shape of Predictor."""
return 1, self.predictor_model.coef_.shape[1]
def _build_prediction_function(self) -> ca.Function:
"""Build the prediction function with casadi and a symbolic input."""
intercept = self.predictor_model.intercept_
coef = self.predictor_model.coef_
function = intercept + ca.mtimes(self.sym_input, coef.T)
return ca.Function("forward", [self.sym_input], [function])
[docs]class CasadiGPR(CasadiPredictor):
"""
Generic implementation of scikit-learn Gaussian Process Regressor.
"""
def __init__(self, serialized_model: SerializedGPR) -> None:
super().__init__(serialized_model)
@property
def input_shape(self) -> tuple[int, int]:
"""Input shape of Predictor."""
return 1, self.predictor_model.X_train_.shape[1]
def _build_prediction_function(self) -> ca.Function:
"""Build the prediction function with casadi and a symbolic input."""
normalize = self.predictor_model.data_handling.normalize
scale = self.predictor_model.data_handling.scale
alpha = self.predictor_model.alpha_
if normalize:
normalized_inp = self._normalize(self.sym_input)
k_star = self._kernel(normalized_inp)
else:
k_star = self._kernel(self.sym_input)
f_mean = ca.mtimes(k_star.T, alpha) * scale
return ca.Function("forward", [self.sym_input], [f_mean])
def _kernel(
self,
x_test: ca.MX,
) -> ca.MX:
"""
Calculates the kernel with regard to mpc and testing data.
If x_train is None the internal mpc data is used.
shape(x_test) = (n_samples, n_features)
shape(x_train) = (n_samples, n_features)
"""
square_distance = self._square_distance(x_test)
length_scale = self.predictor_model.kernel_.k1.k2.length_scale
constant_value = self.predictor_model.kernel_.k1.k1.constant_value
return np.exp((-square_distance / (2 * length_scale**2))) * constant_value
def _square_distance(self, inp: ca.MX):
"""
Calculates the square distance from x_train to x_test.
shape(x_test) = (n_test_samples, n_features)
shape(x_train) = (n_train_samples, n_features)
"""
x_train = self.predictor_model.X_train_
self._check_shapes(inp, x_train)
a = ca.sum2(inp**2)
b = ca.np.sum(x_train**2, axis=1, dtype=float).reshape(-1, 1)
c = -2 * ca.mtimes(x_train, inp.T)
return a + b + c
def _normalize(self, x: ca.MX):
mean = self.predictor_model.data_handling.mean
std = self.predictor_model.data_handling.std
if mean is None and std is not None:
raise ValueError("Mean and std are not valid.")
return (x - ca.DM(mean).T) / ca.DM(std).T
def _check_shapes(self, x_test: Union[ca.MX, np.ndarray], x_train: np.ndarray):
if x_test.shape[1] != x_train.shape[1]:
raise ValueError(
f"The shape of x_test {x_test.shape}[1] and x_train {x_train.shape}[1] must match."
)
###################################
### ANN ###
###################################
[docs]class ANNLayerTypes(str, Enum):
DENSE = "dense"
FLATTEN = "flatten"
BATCHNORMALIZATION = "batch_normalization"
LSTM = "lstm"
RESCALING = "rescaling"
[docs]class Layer:
"""
Single layer of an artificial neural network.
"""
def __init__(self, layer: layers.Layer):
self.config = layer.get_config()
# name
if "name" in self.config:
self.name = self.config["name"]
# units
if "units" in self.config:
self.units = self.config["units"]
# activation function
if "activation" in self.config:
self.activation = self.get_activation(layer.get_config()["activation"])
# input / output shape
self.input_shape = layer.input.shape[1:]
self.output_shape = layer.output.shape[1:]
# update the dimensions to two dimensions
self.update_dimensions()
# symbolic input layer
self.input_layer = ca.MX.sym(
"input_layer", self.input_shape[0], self.input_shape[1]
)
def __str__(self):
ret = ""
if hasattr(self, "units"):
ret += f"\tunits:\t\t\t\t{self.units}\n"
if hasattr(self, "activation"):
ret += f"\tactivation:\t\t\t{self.activation.__str__()}\n"
if hasattr(self, "recurrent_activation"):
ret += f"\trec_activation:\t\t{self.recurrent_activation.__str__()}\n"
ret += f"\tinput_shape:\t\t{self.input_shape}\n"
ret += f"\toutput_shape:\t\t{self.output_shape}\n"
return ret
[docs] def update_dimensions(self):
"""
CasADi does only work with two dimensional arrays. So the dimensions must be updated.
"""
if len(self.input_shape) == 1:
self.input_shape = (1, self.input_shape[0])
elif len(self.input_shape) == 2:
self.input_shape = (self.input_shape[0], self.input_shape[1])
else:
raise ValueError("Please check input dimensions.")
if len(self.output_shape) == 1:
self.output_shape = (1, self.output_shape[0])
elif len(self.output_shape) == 2:
self.output_shape = (self.output_shape[0], self.output_shape[1])
else:
raise ValueError("Please check output dimensions.")
[docs] @staticmethod
def get_activation(function: str):
blank = ca.MX.sym("blank")
if function == "sigmoid":
return ca.Function(function, [blank], [1 / (1 + ca.exp(-blank))])
if function == "tanh":
return ca.Function(function, [blank], [ca.tanh(blank)])
elif function == "relu":
return ca.Function(function, [blank], [ca.fmax(0, blank)])
elif function == "softplus":
return ca.Function(function, [blank], [ca.log(1 + ca.exp(blank))])
elif function == "gaussian":
return ca.Function(function, [blank], [ca.exp(-(blank**2))])
elif function == "linear":
return ca.Function(function, [blank], [blank])
else:
ValueError(f"Unknown activation function:{function}")
[docs]class Dense(Layer):
"""
Fully connected layer.
"""
def __init__(self, layer: layers.Dense):
super().__init__(layer)
# weights and biases
self.weights, self.biases = layer.get_weights()
self.biases = self.biases.reshape(1, self.biases.shape[0])
# check input dimension
if self.input_shape[1] != self.weights.shape[0]:
raise ValueError(
f"Please check the input dimensions of this layer. Layer with error: {self.name}"
)
[docs] def forward(self, input):
# return forward pass
return self.activation(input @ self.weights + self.biases)
[docs]class Flatten(Layer):
[docs] def forward(self, input):
# flattens the input
f = input[0, :]
for row in range(1, input.shape[0]):
f = ca.horzcat(f, input[row, :])
return f
[docs]class BatchNormalization(Layer):
"""
Batch Normalizing layer. Make sure the axis setting is set to two.
"""
def __init__(self, layer: layers.BatchNormalization):
super(BatchNormalization, self).__init__(layer)
# weights and biases
self.gamma = ca.np.vstack([layer.get_weights()[0]] * self.input_shape[0])
self.beta = ca.np.vstack([layer.get_weights()[1]] * self.input_shape[0])
self.mean = ca.np.vstack([layer.get_weights()[2]] * self.input_shape[0])
self.var = ca.np.vstack([layer.get_weights()[3]] * self.input_shape[0])
self.epsilon = layer.get_config()["epsilon"]
# check Dimensions
if self.input_shape != self.gamma.shape:
axis = self.config["axis"][0]
raise ValueError(f"Dimension mismatch. Normalized axis: {axis}")
# symbolic input layer
self.input_layer = ca.MX.sym(
"input_layer", self.input_shape[0], self.input_shape[1]
)
[docs] def forward(self, input):
# forward pass
f = (input - self.mean) / (
ca.sqrt(self.var + self.epsilon)
) * self.gamma + self.beta
return f
[docs]class LSTM(Layer):
"""
Long Short Term Memory cell.
"""
def __init__(self, layer: layers.LSTM):
super(LSTM, self).__init__(layer)
# recurrent activation
self.recurrent_activation = self.get_activation(
layer.get_config()["recurrent_activation"]
)
# load weights and biases
W = layer.get_weights()[0]
U = layer.get_weights()[1]
b = layer.get_weights()[2]
# weights (kernel)
self.W_i = W[:, : self.units]
self.W_f = W[:, self.units : self.units * 2]
self.W_c = W[:, self.units * 2 : self.units * 3]
self.W_o = W[:, self.units * 3 :]
# weights (recurrent kernel)
self.U_i = U[:, : self.units]
self.U_f = U[:, self.units : self.units * 2]
self.U_c = U[:, self.units * 2 : self.units * 3]
self.U_o = U[:, self.units * 3 :]
# biases
self.b_i = ca.np.expand_dims(b[: self.units], axis=0)
self.b_f = ca.np.expand_dims(b[self.units : self.units * 2], axis=0)
self.b_c = ca.np.expand_dims(b[self.units * 2 : self.units * 3], axis=0)
self.b_o = ca.np.expand_dims(b[self.units * 3 :], axis=0)
# initial memory and output
self.h_0 = ca.np.zeros((1, self.units))
self.c_0 = ca.np.zeros((1, self.units))
[docs] def forward(self, input):
# check input shape
if input.shape != self.input_shape:
raise ValueError("Dimension mismatch!")
# initial
c = self.c_0
h = self.h_0
# number of time steps
steps = self.input_shape[0]
# forward pass
for i in range(steps):
# input for the current step
x = input[i, :]
# calculate memory(c) and output(h)
c, h = self.step(x, c, h)
# here the output has to be transposed, because of the dense layer implementation
return h
[docs] def step(self, x_t, c_prev, h_prev):
# gates
i_t = self.recurrent_activation(x_t @ self.W_i + h_prev @ self.U_i + self.b_i)
f_t = self.recurrent_activation(x_t @ self.W_f + h_prev @ self.U_f + self.b_f)
o_t = self.recurrent_activation(x_t @ self.W_o + h_prev @ self.U_o + self.b_o)
c_t = self.activation(x_t @ self.W_c + h_prev @ self.U_c + self.b_c)
# memory and output
c_next = f_t * c_prev + i_t * c_t
h_next = o_t * self.activation(c_next)
return c_next, h_next
[docs]class CasadiANN(CasadiPredictor):
"""
Generic implementations of sequential Keras models in CasADi.
"""
def __init__(self, serialized_model: SerializedANN):
"""
Supported layers:
- Dense (Fully connected layer)
- Flatten (Reduces the input dimension to 1)
- BatchNormalizing (Normalization)
- LSTM (Recurrent Cell)
- Rescaling
Args:
serialized_model: SerializedANN Model.
"""
super().__init__(serialized_model)
@property
def input_shape(self) -> tuple[int, int]:
"""Input shape of Predictor."""
return 1, self.predictor_model.input_shape[1]
def _build_prediction_function(self) -> ca.Function:
"""Build the prediction function with casadi and a symbolic input."""
keras_layers = [layer for layer in self.predictor_model.layers]
casadi_layers = []
for keras_layer in keras_layers:
name = keras_layer.get_config()["name"]
for layer_type in ANNLayerTypes:
if layer_type.value in name:
casadi_layers.append(ann_layer_types[layer_type.value](keras_layer))
continue
function = self.sym_input
for casadi_layer in casadi_layers:
function = casadi_layer.forward(function)
return ca.Function("forward", [self.sym_input], [function])
ann_layer_types = {
ANNLayerTypes.DENSE: Dense,
ANNLayerTypes.FLATTEN: Flatten,
ANNLayerTypes.BATCHNORMALIZATION: BatchNormalization,
ANNLayerTypes.LSTM: LSTM,
}
casadi_predictors = {
MLModels.ANN: CasadiANN,
MLModels.GPR: CasadiGPR,
MLModels.LINREG: CasadiLinReg,
}