"""Module implementing the coordinated ADMM module, which works together
with a coordinator."""
from collections import namedtuple
from typing import Dict, Optional, List
import pandas as pd
import pydantic
from agentlib_mpc.data_structures.mpc_datamodels import MPCVariable
from .admm import ADMM, ADMMConfig
from agentlib_mpc.modules.dmpc.employee import MiniEmployee, MiniEmployeeConfig
from agentlib.utils.validators import convert_to_list
import agentlib_mpc.data_structures.coordinator_datatypes as cdt
import agentlib_mpc.data_structures.admm_datatypes as adt
from agentlib.core import AgentVariable, Agent
coupInput = namedtuple("coup_input", ["mean", "lam"])
[docs]class CoordinatedADMMConfig(MiniEmployeeConfig, ADMMConfig):
shared_variable_fields: list[str] = MiniEmployeeConfig.default(
"shared_variable_fields"
) + ADMMConfig.default("shared_variable_fields")
[docs] @pydantic.field_validator("couplings", "exchange")
def couplings_should_have_values(cls, value: List[AgentVariable]):
"""Asserts that couplings and exchange have values, as they are needed for
initial guess."""
for var in value:
if var.value is None:
raise ValueError(
"Couplings and Exchange Variables should have a value, as it is "
"required for the initial guess."
)
return value
[docs]class CoordinatedADMM(MiniEmployee, ADMM):
"""
Module to implement an ADMM agent, which is guided by a coordinator.
Only optimizes based on callbacks.
"""
config: CoordinatedADMMConfig
def __init__(self, *, config: dict, agent: Agent):
self._initial_setup = True # flag to check that we don't compile ipopt twice
super().__init__(config=config, agent=agent)
self._optimization_inputs: Dict[str, AgentVariable] = {}
self._create_coupling_alias_to_name_mapping()
self._result: Optional[pd.DataFrame] = None
[docs] def process(self):
# send registration request to coordinator
timeout = self.config.registration_interval
while True:
if not self._registered_coordinator:
guesses, ex_guess = self._initial_coupling_values()
answer = adt.AgentToCoordinator(
local_trajectory=guesses, local_exchange_trajectory=ex_guess
)
self.set(cdt.REGISTRATION_A2C, answer.to_json())
yield self.env.timeout(timeout)
[docs] def registration_callback(self, variable: AgentVariable):
"""callback for registration"""
if self._registered_coordinator:
# ignore if registration has already been done
return
self.logger.debug(
f"receiving {variable.name}={variable.value} from {variable.source}"
)
# global parameters to define optimisation problem
value = cdt.RegistrationMessage(**variable.value)
if not value.agent_id == self.source.agent_id:
return
options = adt.ADMMParameters(**value.opts)
self._set_admm_parameters(options=options)
guesses, ex_guess = self._initial_coupling_values()
answer = adt.AgentToCoordinator(
local_trajectory=guesses, local_exchange_trajectory=ex_guess
)
self._registered_coordinator = variable.source
self.set(cdt.REGISTRATION_A2C, answer.to_json())
def _after_config_update(self):
# use some hacks to set jit false for the first time this function is called
if (
self.config.optimization_backend.get("do_jit", False)
and self._initial_setup
):
do_jit = True
self.config.optimization_backend["do_jit"] = False
else:
do_jit = False
super()._after_config_update()
if self._initial_setup:
self.config.optimization_backend["do_jit"] = do_jit
self._initial_setup = False
[docs] def get_new_measurement(self):
"""
Retrieve new measurement from relevant sensors
Returns:
"""
opt_inputs = self.collect_variables_for_optimization()
opt_inputs[adt.PENALTY_FACTOR] = self.penalty_factor_var
self._optimization_inputs = opt_inputs
def _create_coupling_alias_to_name_mapping(self):
"""
creates a mapping of alias to the variable names for multiplier and
global mean that the optimization backend recognizes
Returns:
"""
alias_to_input_names = {}
for coupling in self.var_ref.couplings:
coup_variable = self.get(coupling.name)
coup_in = coupInput(mean=coupling.mean, lam=coupling.multiplier)
alias_to_input_names[coup_variable.alias] = coup_in
for coupling in self.var_ref.exchange:
coup_variable = self.get(coupling.name)
coup_in = coupInput(mean=coupling.mean_diff, lam=coupling.multiplier)
alias_to_input_names[coup_variable.alias] = coup_in
self._alias_to_input_names = alias_to_input_names
[docs] def optimize(self, variable: AgentVariable):
"""
Performs the optimization given the mean trajectories and multipliers from the
coordinator.
Replies with the local optimal trajectories.
Returns:
"""
# unpack message
updates = adt.CoordinatorToAgent.from_json(variable.value)
if not updates.target == self.source.agent_id:
return
self.logger.debug("Received update from Coordinator.")
# load mpc inputs and current coupling inputs of this iteration
opt_inputs = self._optimization_inputs.copy()
# add the coupling inputs of this iteration to the other mpc inputs
for alias, multiplier in updates.multiplier.items():
coup_in = self._alias_to_input_names[alias]
opt_inputs[coup_in.lam] = MPCVariable(name=coup_in.lam, value=multiplier)
opt_inputs[coup_in.mean] = MPCVariable(
name=coup_in.mean, value=updates.mean_trajectory[alias]
)
for alias, multiplier in updates.exchange_multiplier.items():
coup_in = self._alias_to_input_names[alias]
opt_inputs[coup_in.lam] = MPCVariable(name=coup_in.lam, value=multiplier)
opt_inputs[coup_in.mean] = MPCVariable(
name=coup_in.mean, value=updates.mean_diff_trajectory[alias]
)
opt_inputs[adt.PENALTY_FACTOR].value = updates.penalty_parameter
# perform optimization
self._result = self.optimization_backend.solve(
now=self._start_optimization_at, current_vars=opt_inputs
)
# send optimizationData back to coordinator to signal finished
# optimization. Select only trajectory where index is at least zero, to not
# send lags
cons_traj = {}
exchange_traj = {}
for coup in self.config.couplings:
cons_traj[coup.alias] = self._result[
coup.name
] # we can serialize numpy now, maybe make this easier
for exchange in self.config.exchange:
exchange_traj[exchange.alias] = self._result[exchange.name]
opt_return = adt.AgentToCoordinator(
local_trajectory=cons_traj, local_exchange_trajectory=exchange_traj
)
self.logger.debug("Sent optimal solution.")
self.set(name=cdt.OPTIMIZATION_A2C, value=opt_return.to_json())
def _finish_optimization(self):
"""
Finalize an iteration. Usually, this includes setting the actuation.
Returns:
"""
# this check catches the case, where the agent was not alive / registered at
# the start of the round and thus did not participate and has no result
# Since the finish-signal of the coordinator is broadcast, it will trigger this
# function even if the agent did not participate in the optimization before
if self._result is not None:
self.set_actuation(self._result)
self._result = None
def _set_admm_parameters(self, options: adt.ADMMParameters):
"""Sets new admm parameters, re-initializes the optimization problem
and returns an initial guess of the coupling variables."""
# update the config with new parameters
new_config_dict = self.config.model_dump()
new_config_dict.update(
{
adt.PENALTY_FACTOR: options.penalty_factor,
cdt.TIME_STEP: options.time_step,
cdt.PREDICTION_HORIZON: options.prediction_horizon,
}
)
self.config = new_config_dict
self.logger.info("%s: Reinitialized optimization problem.", self.agent.id)
def _initial_coupling_values(self) -> tuple[Dict[str, list], Dict[str, list]]:
"""Gets the initial coupling values with correct trajectory length."""
grid_len = len(self.optimization_backend.coupling_grid)
guesses = {}
exchange_guesses = {}
for var in self.config.couplings:
val = convert_to_list(var.value)
# this overrides more precise guesses, but is more stable
guesses[var.alias] = [val[0]] * grid_len
for var in self.config.exchange:
val = convert_to_list(var.value)
exchange_guesses[var.alias] = [val[0]] * grid_len
return guesses, exchange_guesses
[docs] def init_iteration_callback(self, variable: AgentVariable):
"""Callback that answers the coordinators init_iteration flag."""
if self._registered_coordinator:
super().init_iteration_callback(variable)