import abc
import json
import logging
import subprocess
import numpy as np
from enum import Enum
from copy import deepcopy
from keras import Sequential
from pathlib import Path
from pydantic import ConfigDict, Field, BaseModel
from sklearn.gaussian_process import GaussianProcessRegressor
from sklearn.gaussian_process.kernels import ConstantKernel, WhiteKernel, RBF
from sklearn.linear_model import LinearRegression
from typing import Union, Optional
from agentlib_mpc.data_structures.ml_model_datatypes import OutputFeature, Feature
logger = logging.getLogger(__name__)
[docs]def get_git_revision_short_hash() -> str:
return (
subprocess.check_output(["git", "rev-parse", "--short", "HEAD"])
.decode("ascii")
.strip()
)
[docs]class MLModels(str, Enum):
ANN = "ANN"
GPR = "GPR"
LINREG = "LinReg"
[docs]class SerializedMLModel(BaseModel, abc.ABC):
dt: Union[float, int] = Field(
title="dt",
description="The length of time step of one prediction of Model in seconds.",
)
input: dict[str, Feature] = Field(
default=None,
title="input",
description="Model input variables with their lag order.",
)
output: dict[str, OutputFeature] = Field(
default=None,
title="output",
description="Model output variables (which are automatically also inputs, as "
"we need them recursively in MPC.) with their lag order.",
)
agentlib_mpc_hash: str = Field(
default_factory=get_git_revision_short_hash,
description="The commit hash of the agentlib_mpc version this was created with.",
)
training_info: Optional[dict] = Field(
default=None,
title="Training Info",
description="Config of Trainer class with all the meta data used for training of the Model.",
)
model_type: MLModels
model_config = ConfigDict(protected_namespaces=())
[docs] @classmethod
@abc.abstractmethod
def serialize(
cls,
model: Union[Sequential, GaussianProcessRegressor, LinearRegression],
dt: Union[float, int],
input: dict[str, Feature],
output: dict[str, OutputFeature],
training_info: Optional[dict] = None,
):
"""
Args:
model: Machine Learning Model.
dt: The length of time step of one prediction of Model in seconds.
input: Model input variables with their lag order.
output: Model output variables (which are automatically also inputs, as
we need them recursively in MPC.) with their lag order.
training_info: Config of Trainer Class, which trained the Model.
Returns:
SerializedMLModel version of the passed ML Model.
"""
pass
[docs] @abc.abstractmethod
def deserialize(self):
"""
Deserializes SerializedMLModel object and returns a specific Machine Learning Model object.
Returns:
MLModel: Machine Learning Model.
"""
pass
[docs] def save_serialized_model(self, path: Path):
"""
Saves MLModel object as json string.
Args:
path: relative/absolute path which determines where the json will be saved.
"""
path.parent.mkdir(parents=True, exist_ok=True)
with open(path, "w") as f:
f.write(self.model_dump_json())
# with open(path, "w") as json_file:
# json_file.write(self.model_dump_json())
# Displays the file path under which the json file has been saved.
logger.info(f"Model has been saved under the following path: {path}")
[docs] @classmethod
def load_serialized_model_from_file(cls, path: Path):
"""
Loads SerializedMLModel object from a json file and creates a new specific Machine Learning Model object
which is returned.
Args:
path: relative/absolute path which determines which json file will be loaded.
Returns:
SerializedMLModel object with data from json file.
"""
with open(path, "r") as json_file:
model_data = json.load(json_file)
return cls.load_serialized_model_from_dict(model_data)
[docs] @classmethod
def load_serialized_model_from_string(cls, json_string: str):
"""
Loads SerializedMLModel object from a json string and creates a new specific Machine Learning Model object
which is returned.
Args:
json_string: json string which will be loaded.
Returns:
SerializedMLModel object with data from json file.
"""
model_data = json.loads(json_string)
return cls.load_serialized_model_from_dict(model_data)
[docs] @classmethod
def load_serialized_model_from_dict(cls, model_data: dict):
"""
Loads SerializedMLModel object from a dict and creates a new specific Machine Learning Model object
which is returned.
Args:
json_string: json string which will be loaded.
Returns:
SerializedMLModel object with data from json file.
"""
model_type = model_data["model_type"]
return serialized_models[model_type](**model_data)
[docs] @classmethod
def load_serialized_model(cls, model_data: Union[dict, str, Path]):
"""Loads the ML model from a source"""
if isinstance(model_data, dict):
return cls.load_serialized_model_from_dict(model_data)
if isinstance(model_data, (str, Path)):
if Path(model_data).exists():
return cls.load_serialized_model_from_file(model_data)
return cls.load_serialized_model_from_string(model_data)
[docs]class SerializedANN(SerializedMLModel):
"""
Contains Keras ANN in serialized form and offers functions to transform
Keras Sequential ANNs to SerializedANN objects (from_ANN) and vice versa (deserialize).
attributes:
structure: architecture/structure of ANN saved as json string.
weights: weights and biases of all layers saved as lists of np.ndarrays.
"""
weights: list[list] = Field(
default=None,
title="weights",
description="The weights of the ANN.",
)
structure: str = Field(
default=None,
title="structure",
description="The structure of the ANN as json string.",
)
model_config = ConfigDict(arbitrary_types_allowed=True)
model_type: MLModels = MLModels.ANN
[docs] @classmethod
def serialize(
cls,
model: Sequential,
dt: Union[float, int],
input: dict[str, Feature],
output: dict[str, OutputFeature],
training_info: Optional[dict] = None,
):
"""Serializes Keras Sequential ANN and returns SerializedANN object"""
structure = model.to_json()
weights = []
for layer in model.layers:
weight_l = layer.get_weights()
for idx in range(len(weight_l)):
weight_l[idx] = weight_l[idx].tolist()
weights.append(weight_l)
return cls(
structure=structure,
weights=weights,
dt=dt,
input=input,
output=output,
trainer_config=training_info,
)
[docs] def deserialize(self) -> Sequential:
"""Deserializes SerializedANN object and returns a Keras Sequential ANN."""
from keras import models
ann = models.model_from_json(self.structure)
layer_weights = []
for layer in self.weights:
l_weight = []
layer_weights.append(l_weight)
for matrix in layer:
l_weight.append(np.asarray(matrix))
for i, layer in enumerate(ann.layers):
layer.set_weights(layer_weights[i])
return ann
[docs] def to_dict(self) -> dict:
"""Transforms self to a dictionary and the numpy arrays to lists, so they can
be serialized."""
ann_dict = deepcopy(self.__dict__)
for layer in ann_dict["weights"]:
for idx in range(0, len(layer)):
layer[idx] = layer[idx].tolist()
return ann_dict
[docs]class GPRDataHandlingParameters(BaseModel):
normalize: bool = Field(
default=False,
title="normalize",
description="Boolean which defines whether the input data will be normalized or not.",
)
scale: float = Field(
default=1.0,
title="scale",
description="Number by which the y vector is divided before training and multiplied after evaluation.",
)
mean: Optional[list] = Field(
default=None,
title="mean",
description="Mean values of input data for normalization. None if normalize equals to False.",
)
std: Optional[list] = Field(
default=None,
title="standard deviation",
description="Standard deviation of input data for normalization. None if normalize equals to False.",
)
[docs]class CustomGPR(GaussianProcessRegressor):
"""
Extends scikit-learn GaussianProcessRegressor with normalizing and scaling option
by adding the attribute data_handling, customizing the predict function accordingly
and adding a normalize function.
"""
def __init__(
self,
kernel=None,
*,
alpha=1e-10,
optimizer="fmin_l_bfgs_b",
n_restarts_optimizer=0,
normalize_y=False,
copy_X_train=True,
random_state=None,
data_handling=GPRDataHandlingParameters(),
):
super().__init__(
kernel=kernel,
alpha=alpha,
optimizer=optimizer,
n_restarts_optimizer=n_restarts_optimizer,
normalize_y=normalize_y,
copy_X_train=copy_X_train,
random_state=random_state,
)
self.data_handling: GPRDataHandlingParameters = data_handling
[docs] def predict(self, X, return_std=False, return_cov=False):
"""
Overwrite predict method of GaussianProcessRegressor to include normalization.
"""
if self.data_handling.normalize:
X = self._normalize(X)
return super().predict(X, return_std, return_cov)
def _normalize(self, x: np.ndarray):
mean = self.data_handling.mean
std = self.data_handling.std
if mean is None and std is not None:
raise ValueError("Mean and std are not valid.")
return (x - mean) / std
[docs]class GPRKernelParameters(BaseModel):
constant_value: float = Field(
default=1.0,
title="constant value",
description="The constant value which defines the covariance: k(x_1, x_2) = constant_value.",
)
constant_value_bounds: Union[tuple, str] = Field(
default=(1e-5, 1e5),
title="constant value bounds",
description="The lower and upper bound on constant_value. If set to “fixed”, "
"constant_value cannot be changed during hyperparameter tuning.",
)
length_scale: Union[float, list] = Field(
default=1.0,
title="length_scale",
description="The length scale of the kernel. If a float, an isotropic kernel "
"is used. If an array, an anisotropic kernel is used where each "
"dimension of l defines the length-scale of the respective feature "
"dimension.",
)
length_scale_bounds: Union[tuple, str] = Field(
default=(1e-5, 1e5),
title="length_scale_bounds",
description="The lower and upper bound on ‘length_scale’. If set to “fixed”, "
"‘length_scale’ cannot be changed during hyperparameter tuning.",
)
noise_level: float = Field(
default=1.0,
title="noise level",
description="Parameter controlling the noise level (variance).",
)
noise_level_bounds: Union[tuple, str] = Field(
default=(1e-5, 1e5),
title="noise level bounds",
description="The lower and upper bound on ‘noise_level’. If set to “fixed”, "
"‘noise_level’ cannot be changed during hyperparameter tuning.",
)
theta: list = Field(
title="theta",
description="Returns the (flattened, log-transformed) non-fixed gpr_parameters.",
)
model_config = ConfigDict(arbitrary_types_allowed=True)
[docs] @classmethod
def from_model(cls, model: CustomGPR) -> "GPRKernelParameters":
return cls(
constant_value=model.kernel_.k1.k1.constant_value,
constant_value_bounds=model.kernel_.k1.k1.constant_value_bounds,
length_scale=model.kernel_.k1.k2.length_scale,
length_scale_bounds=model.kernel_.k1.k2.length_scale_bounds,
noise_level=model.kernel_.k2.noise_level,
noise_level_bounds=model.kernel_.k2.noise_level_bounds,
theta=model.kernel_.theta.tolist(),
)
[docs]class GPRParameters(BaseModel):
alpha: Union[float, list] = Field(
default=1e-10,
title="alpha",
description="Value added to the diagonal of the kernel matrix during fitting. "
"This can prevent a potential numerical issue during fitting, by "
"ensuring that the calculated values form a positive definite matrix. "
"It can also be interpreted as the variance of additional Gaussian "
"measurement noise on the training observations. Note that this is "
"different from using a WhiteKernel. If an array is passed, it must "
"have the same number of entries as the data used for fitting and is "
"used as datapoint-dependent noise level. Allowing to specify the "
"noise level directly as a parameter is mainly for convenience and "
"for consistency with Ridge.",
)
L: list = Field(
title="L",
description="Lower-triangular Cholesky decomposition of the kernel in X_train.",
)
X_train: list = Field(
title="X_train",
description="Feature vectors or other representations of training data (also "
"required for prediction).",
)
y_train: list = Field(
title="y_train",
description="Target values in training data (also required for prediction).",
)
n_features_in: int = Field(
title="number of input features",
description="Number of features seen during fit.",
)
log_marginal_likelihood_value: float = Field(
title="log marginal likelihood value",
description="The log-marginal-likelihood of self.kernel_.theta.",
)
model_config = ConfigDict(arbitrary_types_allowed=True)
[docs] @classmethod
def from_model(cls, model: CustomGPR) -> "GPRParameters":
return cls(
alpha=model.alpha_.tolist(),
L=model.L_.tolist(),
X_train=model.X_train_.tolist(),
y_train=model.y_train_.tolist(),
n_features_in=model.n_features_in_,
log_marginal_likelihood_value=model.log_marginal_likelihood_value_,
)
[docs]class SerializedGPR(SerializedMLModel):
"""
Contains scikit-learn GaussianProcessRegressor and its Kernel and provides functions to transform
these to SerializedGPR objects and vice versa.
Attributes:
"""
data_handling: GPRDataHandlingParameters = Field(
default=None,
title="data_handling",
description="Information about data handling for GPR.",
)
kernel_parameters: GPRKernelParameters = Field(
default=None,
title="kernel parameters",
description="Parameters of kernel of the fitted GPR.",
)
gpr_parameters: GPRParameters = Field(
default=None,
title="gpr_parameters",
description=" GPR parameters of GPR and its Kernel and Data of fitted GPR.",
)
model_config = ConfigDict(arbitrary_types_allowed=True)
model_type: MLModels = MLModels.GPR
[docs] @classmethod
def serialize(
cls,
model: CustomGPR,
dt: Union[float, int],
input: dict[str, Feature],
output: dict[str, OutputFeature],
training_info: Optional[dict] = None,
):
"""
Args:
model: GaussianProcessRegressor from ScikitLearn.
dt: The length of time step of one prediction of GPR in seconds.
input: GPR input variables with their lag order.
output: GPR output variables (which are automatically also inputs, as
we need them recursively in MPC.) with their lag order.
training_info: Config of Trainer Class, which trained the Model.
Returns:
SerializedGPR version of the passed GPR.
"""
if not all(
hasattr(model, attr)
for attr in ["kernel_", "alpha_", "L_", "X_train_", "y_train_"]
):
raise ValueError(
"To serialize a GPR, a fitted GPR must be passed, "
"but an unfitted GPR has been passed here."
)
kernel_parameters = GPRKernelParameters.from_model(model)
gpr_parameters = GPRParameters.from_model(model)
return cls(
dt=dt,
input=input,
output=output,
data_handling=model.data_handling,
kernel_parameters=kernel_parameters,
gpr_parameters=gpr_parameters,
trainer_config=training_info,
)
[docs] def deserialize(self) -> CustomGPR:
"""
Deserializes SerializedGPR object and returns a scikit learn GaussionProcessRegressor.
Returns:
gpr_fitted: GPR version of the SerializedGPR
"""
# Create unfitted GPR with standard Kernel and standard Parameters and Hyperparameters.
kernel = ConstantKernel() * RBF() + WhiteKernel()
gpr_unfitted = CustomGPR(
kernel=kernel,
copy_X_train=False,
)
# make basic fit for GPR
gpr_fitted = self._basic_fit(gpr=gpr_unfitted)
# update kernel parameters
gpr_fitted.kernel_.k1.k1.constant_value = self.kernel_parameters.constant_value
gpr_fitted.kernel_.k1.k1.constant_value_bounds = (
self.kernel_parameters.constant_value_bounds
)
gpr_fitted.kernel_.k1.k2.length_scale = self.kernel_parameters.length_scale
gpr_fitted.kernel_.k1.k2.length_scale_bounds = (
self.kernel_parameters.length_scale_bounds
)
gpr_fitted.kernel_.k2.noise_level = self.kernel_parameters.noise_level
gpr_fitted.kernel_.k2.noise_level_bounds = (
self.kernel_parameters.noise_level_bounds
)
gpr_fitted.kernel_.theta = np.array(self.kernel_parameters.theta)
# update gpr_parameters
gpr_fitted.L_ = np.array(self.gpr_parameters.L)
gpr_fitted.X_train_ = np.array(self.gpr_parameters.X_train)
gpr_fitted.y_train_ = np.array(self.gpr_parameters.y_train)
gpr_fitted.alpha_ = np.array(self.gpr_parameters.alpha)
gpr_fitted.n_features_in_ = np.array(self.gpr_parameters.n_features_in)
gpr_fitted.log_marginal_likelihood_value_ = np.array(
self.gpr_parameters.log_marginal_likelihood_value
)
# update data handling
gpr_fitted.data_handling.normalize = self.data_handling.normalize
gpr_fitted.data_handling.scale = self.data_handling.scale
if self.data_handling.mean:
gpr_fitted.data_handling.mean = np.array(self.data_handling.mean)
if self.data_handling.std:
gpr_fitted.data_handling.std = np.array(self.data_handling.std)
return gpr_fitted
def _basic_fit(self, gpr: GaussianProcessRegressor):
"""
Runs an easy fit to be able to populate with kernel_parameters and gpr_parameters
afterward and therefore really fit it.
Args:
gpr: Unfitted GPR to fit
Returns:
gpr: fitted GPR
"""
x = np.ones((1, len(self.input)))
y = np.ones((1, len(self.output)))
gpr.fit(
X=x,
y=y,
)
return gpr
[docs]class LinRegParameters(BaseModel):
coef: list = Field(
title="coefficients",
description="Estimated coefficients for the linear regression problem. If multiple targets are passed during the fit (y 2D), this is a 2D array of shape (n_targets, n_features), while if only one target is passed, this is a 1D array of length n_features.",
)
intercept: Union[float, list] = Field(
title="intercept",
description="Independent term in the linear model. Set to 0.0 if fit_intercept = False.",
)
n_features_in: int = Field(
title="number of input features",
description="Number of features seen during fit.",
)
rank: int = Field(
title="rank",
description="Rank of matrix X. Only available when X is dense.",
)
singular: list = Field(
title="singular",
description="Singular values of X. Only available when X is dense.",
)
[docs]class SerializedLinReg(SerializedMLModel):
"""
Contains scikit-learn LinearRegression and provides functions to transform
these to SerializedLinReg objects and vice versa.
Attributes:
"""
parameters: LinRegParameters = Field(
title="parameters",
description="Parameters of kernel of the fitted linear model.",
)
model_config = ConfigDict(arbitrary_types_allowed=True)
model_type: MLModels = MLModels.LINREG
[docs] @classmethod
def serialize(
cls,
model: LinearRegression,
dt: Union[float, int],
input: dict[str, Feature],
output: dict[str, OutputFeature],
training_info: Optional[dict] = None,
):
"""
Args:
model: LinearRegression from ScikitLearn.
dt: The length of time step of one prediction of LinReg in seconds.
input: LinReg input variables with their lag order.
output: LinReg output variables (which are automatically also inputs, as "
"we need them recursively in MPC.) with their lag order.
training_info: Config of Trainer Class, which trained the Model.
Returns:
SerializedLinReg version of the passed linear model.
"""
if not all(
hasattr(model, attr)
for attr in ["coef_", "intercept_", "n_features_in_", "rank_", "singular_"]
):
raise ValueError(
"To serialize a GPR, a fitted GPR must be passed, "
"but an unfitted GPR has been passed here."
)
parameters = {
"coef": model.coef_.tolist(),
"intercept": model.intercept_.tolist(),
"n_features_in": model.n_features_in_,
"rank": model.rank_,
"singular": model.singular_.tolist(),
}
parameters = LinRegParameters(**parameters)
return cls(
dt=dt,
input=input,
output=output,
parameters=parameters,
trainer_config=training_info,
)
[docs] def deserialize(self) -> LinearRegression:
"""
Deserializes SerializedLinReg object and returns a LinearRegression object of scikit-learn.
Returns:
linear_model_fitted: LinearRegression version of the SerializedLinReg
"""
linear_model_unfitted = LinearRegression()
linear_model_fitted = self._basic_fit(linear_model=linear_model_unfitted)
# update parameters
linear_model_fitted.coef_ = np.array(self.parameters.coef)
linear_model_fitted.intercept_ = np.array(self.parameters.intercept)
linear_model_fitted.n_features_in_ = self.parameters.n_features_in
linear_model_fitted.rank_ = self.parameters.rank
linear_model_fitted.singular_ = np.array(self.parameters.singular)
return linear_model_fitted
def _basic_fit(self, linear_model: LinearRegression):
"""
Runs an easy fit to be able to populate with parameters and gpr_parameters
afterward and therefore really fit it.
Args:
linear_model: Unfitted linear model to fit.
Returns:
linear_model: fitted linear model.
"""
x = np.ones((1, len(self.input)))
y = np.ones((1, len(self.output)))
linear_model.fit(
X=x,
y=y,
)
return linear_model
serialized_models = {
MLModels.ANN: SerializedANN,
MLModels.GPR: SerializedGPR,
MLModels.LINREG: SerializedLinReg,
}