Source code for agentlib_mpc.modules.estimation.mhe

import os
from typing import Dict, Optional, Tuple

import pandas as pd
import pydantic
from agentlib.core import (
    BaseModuleConfig,
    BaseModule,
    Agent,
    AgentVariable,
    Source,
)
from agentlib.core.errors import ConfigurationError
from pydantic import Field

from agentlib_mpc.data_structures import mpc_datamodels
from agentlib_mpc.data_structures.mpc_datamodels import Results
from agentlib_mpc.modules.mpc import create_optimization_backend
from agentlib_mpc.optimization_backends.backend import (
    OptimizationBackendT,
)
from agentlib_mpc.utils.analysis import load_mpc, load_mpc_stats

AG_VAR_DICT = dict[str, AgentVariable]


[docs]class MHEConfig(BaseModuleConfig): """ Pydantic data model for MPC configuration parser """ optimization_backend: dict time_step: float = Field( default=60, ge=0, description="Time step of the MHE.", ) horizon: int = Field( default=5, ge=0, description="Estimation horizon of the MHE.", ) known_parameters: mpc_datamodels.MPCVariables = Field( default=[], description="List of known parameters of the MHE. They are " "constant over the horizon. Parameters not listed " "here will have their default from the model file.", ) estimated_parameters: mpc_datamodels.MPCVariables = Field( default=[], description="List of unknown parameters of the MHE. They are " "constant over the horizon and will be estimated.", ) known_inputs: mpc_datamodels.MPCVariables = Field( default=[], description="List of known input variables of the MHE. Includes " "controls, disturbances, setpoints, dynamic constraint boundaries etc.", ) estimated_inputs: mpc_datamodels.MPCVariables = Field( default=[], description="List of unknown input variables of the MHE. Includes " "mainly disturbances.", ) # AgentVariables for the initial condition of states to be optimized states: mpc_datamodels.MPCVariables = Field( default=[], description="List of all differential states of the MHE.", ) state_weights: dict[str, float] = Field( title="State Weights", default={}, description="Mapping of state names to their weight in the MHE problem. If " "you are certain with your measurement, chose a high value. If " "you dont have a measurement / do not trust it, choose 0. Default " "is 0.", ) shared_variable_fields: list[str] = []
[docs] @classmethod @pydantic.field_validator("state_weights") def state_weights_are_in_states( cls, state_weights: dict, info: pydantic.ValidationInfo ): state_names = {s.name for s in info.data["states"]} state_weight_names = set(state_weights) missing_names = state_weight_names - state_names if missing_names: raise ValueError( f"The following states defined in state weights do not exist in the " f"states: {', '.join(missing_names)}" ) return state_weights
[docs]class MHE(BaseModule): """ A moving horizon estimator. """ config_type = MHEConfig config: MHEConfig var_ref: mpc_datamodels.MHEVariableReference def __init__(self, config: dict, agent: Agent): """ Constructor for model predictive controller (MPC). Args: config: name of the module agent: agent the module belongs to Configs: outputs (object): inputs (object): ts: time step in s n (int): prediction horizon nc (int): control horizon (default prediction horizon) """ super().__init__(config=config, agent=agent) measured_states, weights_states = self._create_auxiliary_variables() self.measured_states: AG_VAR_DICT = measured_states self.weights_states: AG_VAR_DICT = weights_states # creates a reference of variables, which have to be kept track of in a # dataframe, to provide a past trajectory for the MHE self._history_var_names: list[str] = [ v.name for v in self.config.known_inputs ] + list(measured_states) self.history: pd.DataFrame = pd.DataFrame( columns=self._history_var_names, dtype=float ) # construct the optimization problem try: self._init_optimization() except (RuntimeError, ValueError) as err: raise ConfigurationError( f"The optimization backend of Agent {self.source} could not " f"finish its setup!" ) from err def _setup_optimization_backend(self) -> OptimizationBackendT: """Performs the setup of the optimization_backend, keeps track of status""" self.init_status = mpc_datamodels.InitStatus.during_update opti_back = create_optimization_backend( self.config.optimization_backend, self.agent.id ) opti_back.register_logger(self.logger) disc_opts = opti_back.config.discretization_options disc_opts.time_step = self.config.time_step return opti_back def _setup_var_ref(self) -> mpc_datamodels.MHEVariableReference: var_ref = mpc_datamodels.MHEVariableReference.from_config(self.config) var_ref.measured_states = list(self.measured_states) var_ref.weights_states = list(self.weights_states) return var_ref def _after_config_update(self): self._create_auxiliary_variables() self.var_ref = self._setup_var_ref() self.optimization_backend = self._setup_optimization_backend() self._init_optimization() self.init_status = mpc_datamodels.InitStatus.ready def _init_optimization(self): """Performs the setup of the optimization backend.""" self.optimization_backend.setup_optimization( var_ref=self.var_ref, ) self.logger.info("%s: Initialized optimization problem.", self.agent.id)
[docs] def process(self): while True: current_vars = self.collect_variables_for_optimization() solution = self.optimization_backend.solve( now=self.env.now, current_vars=current_vars ) self._set_estimation(solution) self._remove_old_values_from_history() yield self.env.timeout(self.config.time_step)
def _remove_old_values_from_history(self): """Clears the history of all entries that are older than current time minus horizon length.""" backwards_horizon_seconds = self.config.horizon * self.config.time_step oldest_relevant_time = self.env.now - backwards_horizon_seconds filt = self.history.index >= oldest_relevant_time self.history = self.history[filt] def _set_estimation(self, solution: Results): """Sets the estimated variables to the DataBroker.""" # parameters are scalars defined at the beginning of the problem, so we send # the first value in the parameter trajectory for parameter in self.var_ref.estimated_parameters: par_val = solution[parameter] self.set(parameter, par_val) # we want to know the most recent value of states and inputs for var in self.var_ref.states + self.var_ref.estimated_inputs: value = solution[var][-1] self.set(var, float(value))
[docs] def register_callbacks(self): """Registers callbacks which listen to the variables which have to be saved as time series. These callbacks save the values in the history for use in the optimization.""" for inp in self.var_ref.known_inputs: var = self.get(inp) self.agent.data_broker.register_callback( alias=var.alias, source=var.source, callback=self._callback_hist_vars, name=var.name, ) # registers callback which listens to alias/source of the state variable, but # gets the name of the measured state as parameter, to correctly save it in the # history for state, meas_state in zip(self.var_ref.states, self.var_ref.measured_states): var = self.get(state) self.agent.data_broker.register_callback( alias=var.alias, source=var.source, callback=self._callback_hist_vars, name=meas_state, )
[docs] def collect_variables_for_optimization( self, var_ref: mpc_datamodels.MHEVariableReference = None ) -> Dict[str, AgentVariable]: """Gets all variables noted in the var ref and puts them in a flat dictionary. The MHE Version of this function has to perform some checks and lookups extra, since variables come from different sources, and some need to incorporate trajectories of past values.""" if var_ref is None: var_ref = self.var_ref # first fetch all variables with get, that are in the config all_variables = {v: self.get(v) for v in var_ref.all_variables()} # then, collect the variables for the weights and measured states, that have # been generated and are not in the config for ms_name, ms_var in self.measured_states.items(): all_variables[ms_name] = ms_var.copy() for w_name, w_var in self.weights_states.items(): all_variables[w_name] = w_var.copy() # for values whose past trajectory is required in the optimization, set the # var value to that trajectory for hist_var in self._history_var_names: past_values = self.history[hist_var].dropna() if not any(past_values): # if the history of a variable is empty, fallback to the scalar value continue # create copy to not mess up scalar value of original variable in case # fallback is needed all_variables[hist_var].value = past_values return all_variables
def _callback_hist_vars(self, variable: AgentVariable, name: str): """Adds received measured inputs to the past trajectory.""" self.history.loc[variable.timestamp, name] = variable.value def _create_auxiliary_variables(self) -> tuple[AG_VAR_DICT, AG_VAR_DICT]: """Creates variables holding the weights and measurements of the states""" states: mpc_datamodels.MPCVariables = self.config.states measured_states: dict[str, AgentVariable] = {} weights_states: dict[str, AgentVariable] = {} for state in states: weight_name = "weight_" + state.name measurement_name = "measured_" + state.name weights_states[weight_name] = mpc_datamodels.MPCVariable( name=weight_name, value=self.config.state_weights.get(state.name, 0), type="float", source=Source(module_id=self.id), ) measured_states[measurement_name] = mpc_datamodels.MPCVariable( name=measurement_name, value=pd.Series(state.value), type="pd.Series", source=state.source, ) self.weights_states = weights_states self.measured_states = measured_states return measured_states, weights_states
[docs] def get_results(self) -> Optional[pd.DataFrame]: """Read the results that were saved from the optimization backend and returns them as Dataframe. Returns: (results, stats) tuple of Dataframes. """ results_file = self.optimization_backend.config.results_file try: results, _ = self.read_results_file(results_file) return results except FileNotFoundError: self.logger.error("Results file %s was not found.", results_file) return None
[docs] @staticmethod def read_results_file(results_file: str) -> Tuple[pd.DataFrame, pd.DataFrame]: """ Read the provided csv-file as an MPC results file. Args: results_file: File path Returns: results, stats results is the Dataframe with all inputs and outputs of the MPC optimizations. stats is the Dataframe with matching solver stats """ results = load_mpc(results_file) stats = load_mpc_stats(results_file) return results, stats
[docs] def cleanup_results(self): results_file = self.optimization_backend.config.results_file if not results_file: return os.remove(results_file) os.remove(mpc_datamodels.stats_path(results_file))