Source code for agentlib_mpc.modules.mpc

"""Holds the base class for MPCs."""

import os
from typing import Tuple, Dict, Optional

import pandas as pd
from pydantic import Field, field_validator

from agentlib.core.datamodels import (
    AgentVariable,
)
from agentlib.core import Model, BaseModule, BaseModuleConfig, Agent
from agentlib.core.errors import OptionalDependencyError, ConfigurationError
from agentlib.utils import custom_injection
from pydantic_core.core_schema import FieldValidationInfo

from agentlib_mpc.data_structures.mpc_datamodels import (
    VariableReference,
    InitStatus,
    Results,
)
from agentlib_mpc.optimization_backends import backend_types, uninstalled_backend_types
from agentlib_mpc.optimization_backends.backend import (
    OptimizationBackend,
    OptimizationBackendT,
)
from agentlib_mpc.data_structures import mpc_datamodels
from agentlib_mpc.utils.analysis import load_mpc, load_mpc_stats


[docs]class BaseMPCConfig(BaseModuleConfig): """ Pydantic data model for MPC configuration parser """ # todo use config of optimization backend in annotation and create like modules optimization_backend: dict time_step: float = Field( default=60, ge=0, description="Time step of the MPC.", ) prediction_horizon: int = Field( default=5, ge=0, description="Prediction horizon of the MPC.", ) sampling_time: Optional[float] = Field( default=None, # seconds description="Sampling interval for control steps. If None, will be the same as" " time step. Does not affect the discretization of the MPC, " "only the interval with which there will be optimization steps.", validate_default=True, ) parameters: mpc_datamodels.MPCVariables = Field( default=[], description="List of model parameters of the MPC. They are " "constant over the horizon. Parameters not listed " "here will have their default from the model file.", ) inputs: mpc_datamodels.MPCVariables = Field( default=[], description="List of all input variables of the MPC. Includes " "predictions for disturbances, set_points, dynamic " "constraint boundaries etc.", ) outputs: mpc_datamodels.MPCVariables = Field( default=[], description="List of all shared outputs of the MPC. " ) # AgentVariables for the controls to be optimized controls: mpc_datamodels.MPCVariables = Field( default=[], description="List of all control variables of the MPC. " ) # AgentVariables for the initial condition of states to be optimized states: mpc_datamodels.MPCVariables = Field( default=[], description="List of all differential states of the MPC. The " "entries can define the boundaries and the source for the measurements", ) set_outputs: bool = Field( default=False, description="Sets the full output time series to the data broker.", ) shared_variable_fields: list[str] = ["outputs", "controls"]
[docs] @field_validator("sampling_time") @classmethod def default_sampling_time(cls, samp_time, info: FieldValidationInfo): if samp_time is None: samp_time = info.data["time_step"] return samp_time
[docs]def create_optimization_backend(optimization_backend, agent_id): """Set up the optimization_backend""" optimization_backend = optimization_backend.copy() if "type" not in optimization_backend: raise KeyError( "Given model config does not contain key 'type' (type of the model)." ) _type = optimization_backend.pop("type") optimization_backend["name"] = agent_id if isinstance(_type, dict): custom_cls = custom_injection(config=_type) backend = custom_cls(**optimization_backend) elif isinstance(_type, str): if _type in uninstalled_backend_types: raise OptionalDependencyError( dependency_name=_type, dependency_install=uninstalled_backend_types[_type], ) if _type not in backend_types: raise TypeError( f"Given backend is not a valid internal optimization " f"backend. Supported backends are " f"{', '.join(list(backend_types.keys()))}" ) backend = backend_types[_type](config=optimization_backend) else: raise TypeError( f"Error loading optimization backend. Config " f"'type' has to be either str or dict. Got " f"{type(_type)} instead. " ) assert isinstance(backend, OptimizationBackend) return backend
[docs]class BaseMPC(BaseModule): """ A model predictive controller. More info to follow. """ config: BaseMPCConfig def __init__(self, config: dict, agent: Agent): """ Constructor for model predictive controller (MPC). Args: config: name of the module agent: agent the module belongs to Configs: outputs (object): inputs (object): ts: time step in s n (int): prediction horizon nc (int): control horizon (default prediction horizon) """ self.init_status = mpc_datamodels.InitStatus.pre_module_init super().__init__(config=config, agent=agent) # Check that module config and model variables match unassigned_model_variables = self.assert_mpc_variables_are_in_model() assert unassigned_model_variables["inputs"] == set(), ( f"All model inputs must be declared in the MPC config. Model " f"variable(s) '{unassigned_model_variables['inputs']}' is/are free." ) def _setup_optimization_backend(self) -> OptimizationBackendT: """Performs the setup of the optimization_backend, keeps track of status""" self.init_status = mpc_datamodels.InitStatus.during_update opti_back = create_optimization_backend( self.config.optimization_backend, self.agent.id ) opti_back.register_logger(self.logger) disc_opts = opti_back.config.discretization_options disc_opts.prediction_horizon = self.config.prediction_horizon disc_opts.time_step = self.config.time_step return opti_back def _setup_var_ref(self) -> mpc_datamodels.VariableReferenceT: return VariableReference.from_config(self.config) def _after_config_update(self): self.var_ref: mpc_datamodels.VariableReferenceT = self._setup_var_ref() self.optimization_backend: OptimizationBackendT = ( self._setup_optimization_backend() ) self._init_optimization() self.init_status = mpc_datamodels.InitStatus.ready
[docs] def assert_subset(self, mpc_names, model_names, message_head): """ Helper function for assert assert_mpc_variables_are_in_model. Asserts the variables of the var_ref corresponding to ref_key are a subset of a list of names provided (usually obtained from the model) and prints out an error if false. Returns the portion of model_names that are not in the given var_ref. """ assert set(mpc_names).issubset(model_names), ( f"{message_head} of MPC {self.agent.id} are not contained in " f"model. Names must match. The following variables defined for the " f"MPC do not appear in the model: " f"'{set(mpc_names).difference(model_names)}'." ) return set(model_names).difference(mpc_names)
[docs] def assert_mpc_variables_are_in_model(self) -> dict[str, set[str]]: """ Checks whether all variables of var_ref are contained in the model. Returns names of model variables not contained in the var_ref, sorted by keys: 'states', 'inputs', 'outputs', 'parameters'. """ # arguments for validation function: # (key in var_ref, model names, str for head error message) args = [ ( "states", self.model.get_state_names(), "Differential variables / States", ), ("controls", self.model.get_input_names(), "Controls"), ("inputs", self.model.get_input_names(), "Inputs"), ("outputs", self.model.get_output_names(), "Outputs"), ("parameters", self.model.get_parameter_names(), "Parameters"), ] # perform validations and make a dictionary of unassigned variables unassigned_by_mpc_var = { key: self.assert_subset(self.var_ref.__dict__[key], names, message) for key, names, message in args } # fix unassigned values for inputs intersection_input = set(unassigned_by_mpc_var["controls"]).intersection( unassigned_by_mpc_var["inputs"] ) # return dict should have model variables as keys, not mpc variables unassigned_by_model_var = { "states": unassigned_by_mpc_var["states"], "inputs": intersection_input, "outputs": unassigned_by_mpc_var["outputs"], "parameters": unassigned_by_mpc_var["parameters"], } return unassigned_by_model_var
[docs] def collect_variables_for_optimization( self, var_ref: mpc_datamodels.VariableReference = None ) -> Dict[str, AgentVariable]: """Gets all variables noted in the var ref and puts them in a flat dictionary.""" if var_ref is None: var_ref = self.var_ref return {v: self.get(v) for v in var_ref.all_variables()}
# class AgVarDropin: # ub: float # lb: float # value: Union[float, list, pd.Series] # interpolation_method: InterpolationMethod
[docs] def process(self): while True: self.do_step() yield self.env.timeout(self.config.time_step)
[docs] def register_callbacks(self): """Registers the init_optimization callback to all parameters which cannot be changed without recreating the optimization problem.""" for key in OptimizationBackend.mpc_backend_parameters: self.agent.data_broker.register_callback( alias=key, source=None, callback=self.re_init_optimization )
def _init_optimization(self): """Performs the setup of the optimization backend.""" try: self.optimization_backend.setup_optimization(var_ref=self.var_ref) except (RuntimeError, ValueError) as err: raise ConfigurationError( f"The optimization backend of Agent {self.source} could not " f"finish its setup!" ) from err self.logger.info("%s: Initialized optimization problem.", self.agent.id)
[docs] def re_init_optimization(self, parameter: AgentVariable): """Re-initializes the optimization backend with new parameters.""" self.optimization_backend.discretization_options[ parameter.name ] = parameter.value self._init_optimization()
@property def model(self) -> Model: """ Getter for current simulation model Returns: agentlib.model: Current simulation model """ return self.optimization_backend.model
[docs] def pre_computation_hook(self): """ This method is called in every computation step before the optimization starts. Overwrite this method in a derived subclass if you want to take some actions each time before the optimal control problem is solved. """ pass
[docs] def do_step(self): """ Performs an MPC step. """ if not self.init_status == InitStatus.ready: self.logger.warning("Skipping step, optimization_backend is not ready.") return self.pre_computation_hook() # get new values from data_broker updated_vars = self.collect_variables_for_optimization() # solve optimization problem with up-to-date values from data_broker result = self.optimization_backend.solve(self.env.time, updated_vars) # Set variables in data_broker self.set_actuation(result) self.set_output(result)
[docs] def set_actuation(self, solution: Results): """Takes the solution from optimization backend and sends the first step to AgentVariables.""" self.logger.info("Sending optimal control values to data_broker.") for control in self.var_ref.controls: # take the first entry of the control trajectory actuation = solution[control][0] self.set(control, actuation)
[docs] def set_output(self, solution: Results): """Takes the solution from optimization backend and sends it to AgentVariables.""" # Output must be defined in the conig as "type"="pd.Series" if not self.config.set_outputs: return self.logger.info("Sending optimal output values to data_broker.") df = solution.df for output in self.var_ref.outputs: series = df.variable[output] self.set(output, series)
[docs] def get_results(self) -> Optional[pd.DataFrame]: """Read the results that were saved from the optimization backend and returns them as Dataframe. Returns: (results, stats) tuple of Dataframes. """ results_file = self.optimization_backend.config.results_file if results_file is None or not self.optimization_backend.config.save_results: self.logger.info("No results were saved .") return None try: result, stat = self.read_results_file(results_file) self.warn_for_missed_solves(stat) return result except FileNotFoundError: self.logger.error("Results file %s was not found.", results_file) return None
[docs] def warn_for_missed_solves(self, stats: Optional[pd.DataFrame]): """ Read the solver information from the optimization Returns: Warning if solver fails """ if stats is None: return if stats["success"].all(): return failures = ~stats["success"] failure_indices = failures[failures].index.tolist() self.logger.warning( f"Warning: There were failed optimizations at the following times: " f"{failure_indices}." )
[docs] @staticmethod def read_results_file(results_file: str) -> Tuple[pd.DataFrame, pd.DataFrame]: """ Read the provided csv-file as an MPC results file. Args: results_file: File path Returns: results, stats results is the Dataframe with all inputs and outputs of the MPC optimizations. stats is the Dataframe with matching solver stats """ results = load_mpc(results_file) stats = load_mpc_stats(results_file) return results, stats
[docs] def cleanup_results(self): results_file = self.optimization_backend.config.results_file if not results_file: return os.remove(results_file) os.remove(mpc_datamodels.stats_path(results_file))