Source code for agentlib_mpc.modules.dmpc.admm.admm

"""Holds functionality for ADMM modules."""

import time
import threading
from typing import List, Dict, Tuple, Iterable, Optional, TypeVar, Union
import queue
from enum import Enum, auto

import numpy as np
import pandas as pd
from agentlib.core.errors import ConfigurationError
from pydantic import field_validator, Field

from agentlib.core import (
    Source,
    AgentVariable,
)

from agentlib_mpc.data_structures.mpc_datamodels import MPCVariable
from agentlib_mpc.modules.dmpc import DistributedMPC, DistributedMPCConfig
from agentlib_mpc.optimization_backends.backend import ADMMBackend
from agentlib.utils.validators import convert_to_list
from agentlib_mpc.data_structures import mpc_datamodels
import agentlib_mpc.data_structures.admm_datatypes as adt
from agentlib_mpc.data_structures.mpc_datamodels import Results


# noinspection PyArgumentList
[docs]class ModuleStatus(Enum): not_started = auto() syncing = auto() at_registration = auto() optimizing = auto() updating = auto() waiting_for_other_agents = auto() sleeping = auto()
# noinspection PyArgumentList
[docs]class ParticipantStatus(Enum): not_participating = auto() available = auto() confirmed = auto() not_available = auto()
[docs]class ADMMParticipation: """Holds data for the status of a shared variable of another system.""" def __init__(self, variable): self.variable: AgentVariable = variable self.status: ParticipantStatus = ParticipantStatus.not_participating # no more than two messages should stack self.received: queue.Queue = queue.Queue(maxsize=5)
[docs] def empty_memory(self): while True: try: self.received.get_nowait() except queue.Empty: break
[docs] def de_register(self): self.status = ParticipantStatus.not_participating self.empty_memory()
[docs]class ADMMConfig(DistributedMPCConfig): couplings: List[mpc_datamodels.MPCVariable] = [] exchange: List[mpc_datamodels.MPCVariable] = [] penalty_factor: float = Field( default=10, ge=0, description="Penalty factor of the ADMM algorithm. Should be equal for all " "agents.", ) iteration_timeout: float = Field( default=20, ge=0, description="Maximum computation + waiting time for one iteration.", ) registration_period: float = Field( default=2, ge=0, description="Time spent on registration before each optimization", ) max_iterations: float = Field( default=20, ge=0, description="Maximum number of ADMM iterations before termination of control " "step.", )
[docs] @field_validator( "exchange", "couplings", "parameters", "inputs", "outputs", "controls", "states" ) @classmethod def check_prefixes_of_variables(cls, variables: list[AgentVariable]): """Ensures no user provided variable is named with the reserved ADMM prefix.""" conf_err = ConfigurationError( f"Do not use variables that start with " f"'{adt.ADMM_PREFIX}' in an ADMM config." ) for var in variables: if var.name.startswith(adt.ADMM_PREFIX): raise conf_err return variables
ADMMConfigT = TypeVar("ADMMConfigT", bound=ADMMConfig)
[docs]class ADMM(DistributedMPC): """ This class represents a module participating in a fully decentralized Consensus-ADMM optimization for distributed MPC. Agents autonomously send the values of their coupling variables, register other participants and perform update steps. """ config: ADMMConfig var_ref: adt.VariableReference def __init__(self, config: dict, agent): self.var_qu = queue.Queue() self.start_step = threading.Event() self._status: ModuleStatus = ModuleStatus.syncing self._registered_participants = {} self._admm_variables: dict[str, AgentVariable] = {} super().__init__(config=config, agent=agent)
[docs] def collect_couplings_for_optimization(self): """Collects updated AgentVariables only of the coupling variables.""" coup_vars = {} for coup in self.var_ref.couplings + self.var_ref.exchange: coup_vars.update( {v: self._admm_variables[v] for v in coup.admm_variables()} ) coup_vars["penalty_factor"] = self.penalty_factor_var return coup_vars
[docs] def process(self): # this thread will perform the optimization whenever start_step is set thread = threading.Thread( target=self._admm_loop, daemon=True, name=f"admm_loop_{self.agent.id}" ) thread.start() self.agent.register_thread(thread=thread) self._status: ModuleStatus = ModuleStatus.syncing yield self._sync_start() self.logger.info("Starting periodic execution of admm algorithm") while True: self.start_step.set() yield self.env.timeout(self.config.time_step)
def _sync_start(self): """Waits until time is a multiple of the time step.""" time_step = self.config.time_step delta = time_step - (time.time() % time_step) wait_time = delta self.logger.info("Waiting %s s to sync admm algorithm", wait_time) return self.env.timeout(wait_time) def _admm_loop(self): """Triggers the optimization whenever self.start_step is set.""" while True: self._status: ModuleStatus = ModuleStatus.sleeping self.start_step.wait() self.start_step.clear() self.admm_step() if self.start_step.isSet(): self.logger.error( "%s: Start of ADMM round was requested before " "last one finished. Waiting until next " "cycle." ) self.start_step.clear()
[docs] def admm_step(self): """Performs an entire ADMM optimization.""" self._perform_registration() # get optimization inputs self._set_mean_coupling_values() opt_inputs = self.collect_variables_for_optimization() self.pre_computation_hook() # reset termination criteria start_iterations = self.env.time admm_iter = 0 # start the ADMM iteration loop while True: start_opt = time.time() # Solve local optimization result = self._solve_local_optimization( opt_inputs=opt_inputs, current_iteration=admm_iter, start_time=start_iterations, ) # admm coordination step self.send_coupling_values(result) self._status = ModuleStatus.waiting_for_other_agents self._receive_variables(start=start_opt) self._status = ModuleStatus.updating self._set_mean_coupling_values() self.update_lambda() self.reset_participants_ready() # check termination admm_iter += 1 if self._check_termination(admm_iter, start_iterations): break self.deregister_all_participants() self.set_actuation(result)
def _solve_local_optimization( self, opt_inputs: Dict[str, AgentVariable], current_iteration: int, start_time: float, ) -> Results: """ Performs the local optimization and returns the result. Args: opt_inputs: dict with AgentVariables that stay constant between optimizations current_iteration: current iteration number start_time: environment time at start of ADMM algorithm Returns: DataFrame of all optimization variables. """ updated_couplings = self.collect_couplings_for_optimization() opt_inputs.update(updated_couplings) self.logger.info("Solving local optimization #%s.", current_iteration) self._status: ModuleStatus = ModuleStatus.optimizing result = self.optimization_backend.solve(start_time, opt_inputs) self.logger.info("Solved local optimization #%s.", current_iteration) return result def _perform_registration(self): """Registers participants in current round""" self._status: ModuleStatus = ModuleStatus.at_registration self.logger.info("Start registration of round at %s.", self.env.now) # shift initial values for multipliers and coupling outputs self._shift_and_send_coupling_outputs() self._shift_multipliers() # accept registrations within a fixed time (handled by callbacks) time.sleep(self.config.registration_period) self._status: ModuleStatus = ModuleStatus.updating self.logger.info("%s: Finished registration of round") def _check_termination(self, admm_iter: int, start_iteration: float) -> bool: """ Args: admm_iter: current iteration number start_iteration: environment time at which current optimization began Returns: True, if the algorithm should be terminated, False, if it should continue """ self.logger.debug("Finished iteration no. %s.", admm_iter) # check wait_on_start_iterations available_runtime = self.config.time_step - self.config.registration_period if self.env.now - start_iteration > available_runtime: self.logger.warning( "ADMM did not converge within the specified sampling time " "of %ss. Terminating current control step.", self.config.time_step, ) return True # check maximum iterations if admm_iter >= self.config.max_iterations: self.logger.warning( "ADMM did not converge within the maximum iteration number " "of %s. Terminating current control step.", self.config.max_iterations, ) return True return False def _receive_variables(self, start): """Wait until all coupling variables arrive from the other systems.""" timeout = self.config.iteration_timeout remaining_time = max(timeout - (time.time() - start), 0) for participant in self.all_coupling_statuses(): if participant.status == ParticipantStatus.not_participating: continue try: var = participant.received.get(timeout=remaining_time) participant.variable = var participant.status = ParticipantStatus.confirmed except queue.Empty: participant.de_register() source = participant.variable.source coup = participant.variable.alias self.logger.info( "De-registered participant %s from " "coupling %s as it was too slow.", source, coup, ) remaining_time = max(timeout - (time.time() - start), 0)
[docs] def all_coupling_statuses(self) -> Iterable[ADMMParticipation]: """Gives and iterator of all ADMMParticipation that are registered.""" for coup_participants in self.registered_participants.values(): for participant in coup_participants.values(): yield participant
def _shift(self, sequence: List[float], grid: List[float]) -> List[float]: """ Shifts the sequence forward by one sampling time. Args: sequence: Sequence of variable values. grid: Timestamps belonging to the sequence starting from 0. Returns: The shifted list with the last values duplicated. """ # get index of first grid point greater self.ts index = next(x[0] for x in enumerate(grid) if x[1] >= self.config.time_step) shifted = sequence[index:] + sequence[-index:] return shifted def _shift_multipliers(self): """Shifts lagrange multipliers by one sampling interval. If a scalar is given, expands to the correct length.""" for coup in self.cons_and_exchange: grid = self.optimization_backend.coupling_grid var = self._admm_variables[coup.multiplier] val = var.value if len(val) == 1: val = val * len(grid) val = self._shift(sequence=val, grid=grid) self._admm_variables[var.name].value = val def _shift_and_send_coupling_outputs(self): """Shifts global coupling variables by one sampling interval. If a scalar is given, expands to the correct length. Sets the values as output to the data_broker, sending them.""" self.logger.info("Sending initial coupling outputs ...") for coupling in self.cons_and_exchange: grid = self.optimization_backend.coupling_grid length = len(grid) # shift output variable var = self._admm_variables[coupling.local] val = var.value # expand lists that were _finished_discretization with a scalar if len(val) == 1: val = val * length val = self._shift(sequence=val, grid=grid) self.send_coupling_variable(var.name, val)
[docs] def assert_mpc_variables_are_in_model(self): unassigned_model = super().assert_mpc_variables_are_in_model() for coup in self.config.couplings + self.config.exchange: if coup.name in unassigned_model["inputs"]: unassigned_model["inputs"] = self.assert_subset( [coup.name], unassigned_model["inputs"], "Couplings" ) elif coup.name in unassigned_model["outputs"]: unassigned_model["outputs"] = self.assert_subset( [coup.name], unassigned_model["outputs"], "Couplings" ) elif coup.name in unassigned_model["states"]: unassigned_model["states"] = self.assert_subset( [coup.name], unassigned_model["states"], "Couplings" ) return unassigned_model
@property def registered_participants(self) -> Dict[str, Dict[str, ADMMParticipation]]: """Dictionary containing all other agents this agent shares variables with. Ordered in a two-layer form, with variables at the first layer and agents at the second layer. Contains ADMMParticipation objects at the base layer. Examples: self.registered_participants = {'coupling_var_1': {'src_of_agent1': status_1, 'src_of_agent2': status_2, 'src_of_agent3': status_3} 'coupling_var_1': {'src_of_agent3': status_a, 'src_of_agent2': status_b, 'src_of_agent4': status_c} } here, <status> refers to an ADMMParticipation object. """ return self._registered_participants @registered_participants.setter def registered_participants(self, reg_par: Dict): self._registered_participants = reg_par @property def cons_and_exchange(self) -> List[Union[adt.ExchangeEntry, adt.CouplingEntry]]: return self.var_ref.exchange + self.var_ref.couplings
[docs] def reset_participants_ready(self): """Sets the ready status of all participating agents to False.""" for coup_participants in self.registered_participants.values(): for participant in coup_participants.values(): if participant.received.qsize(): participant.status = ParticipantStatus.available else: participant.status = ParticipantStatus.not_available
[docs] def deregister_all_participants(self): """Sets the participating status of all participating agents to False.""" self.logger.info("De-registering all participants for next round.") for coup_participants in self.registered_participants.values(): for participant in coup_participants.values(): participant.de_register()
[docs] def participant_callback(self, variable: AgentVariable): """Puts received variables in the correct queue, depending on registration status of this agent.""" if variable.source.agent_id != self.agent.id: self.receive_participant(variable)
[docs] def receive_participant(self, variable: AgentVariable): """Set the participation to true for the given coupling input.""" # Create copy just in case reg_par_of_coupling = self.registered_participants[variable.alias].copy() # add variables that were seen the first time if variable.source not in reg_par_of_coupling: self.logger.info( "Initially registered variable '%s' from '%s'.", variable.alias, variable.source, ) reg_par_of_coupling[variable.source] = ADMMParticipation(variable=variable) neighbor: ADMMParticipation = reg_par_of_coupling[variable.source] # perform registration at start of round if self._status == ModuleStatus.at_registration: self.logger.debug( "Registered variable '%s' from '%s' for this round.", variable.alias, variable.source, ) neighbor.empty_memory() neighbor.status = ParticipantStatus.not_available neighbor.variable = variable # confirm new trajectory during admm iterations if self._status in ( ModuleStatus.waiting_for_other_agents, ModuleStatus.optimizing, ModuleStatus.updating, ): try: neighbor.received.put_nowait(variable) neighbor.status = ParticipantStatus.available self.logger.debug( "Received variable '%s' from '%s' and set to " "ready: 'True'.", variable.alias, variable.source, ) except queue.Full: # status.de_register() source = neighbor.variable.source coup = neighbor.variable.alias self.logger.error( "De-registered participant %s from coupling %s as it " "sends messages too quickly.", source, coup, ) if neighbor.received.qsize() > 2: self.logger.error(f"Queue is too full {neighbor.received.qsize()}") neighbor.variable = variable # Set the altered copy again self.registered_participants[variable.alias] = reg_par_of_coupling
[docs] def get_participants_values(self, coupling_alias: str) -> List[pd.Series]: """Get the values of all agents for a coupling variable.""" values = [] for participant in self.registered_participants[coupling_alias].values(): if participant.status == ParticipantStatus.confirmed: values.append(participant.variable.value) if not values: self.logger.warning("Did not get participants values for this round") return values
[docs] def send_coupling_values(self, solution: Results): """ Sets the coupling outputs to the data_broker, which automatically sends them. Args: solution: Output dictionary from optimization_backend.solve(). """ self.logger.info("Sending optimal values to other agents.") for coup in self.cons_and_exchange: self.send_coupling_variable(coup.local, list(solution[coup.name]))
def _set_mean_coupling_values(self): """Computes the current global value of a coupling variable and saves it in the data_broker.""" for coupling in self.var_ref.couplings: # Get own coupling variable version own_coup_var = self._admm_variables[coupling.local] own_coup_value = own_coup_var.value coup_alias = own_coup_var.alias # Get variables values: other_coup_values = self.get_participants_values(coup_alias) # Add own value other_coup_values.append(own_coup_value) # Build mean over all values other_coup_values = np.array(other_coup_values) mean_coup_value = list(np.mean(other_coup_values, axis=0)) self._admm_variables[coupling.mean].value = mean_coup_value self.logger.debug( "Updated mean_%s = %s", own_coup_var.name, mean_coup_value ) for exchange in self.var_ref.exchange: own_exchange_var = self._admm_variables[exchange.local] own_exchange_value = own_exchange_var.value exchange_alias = own_exchange_var.alias # Get variables values: other_coup_values = self.get_participants_values(exchange_alias) # Add own value other_coup_values.append(own_exchange_value) # Build mean over all values other_coup_values = np.array(other_coup_values) mean_coup_value = np.mean(other_coup_values, axis=0) mean_diff = list(own_exchange_value - mean_coup_value) self._admm_variables[exchange.mean_diff].value = mean_diff self.logger.debug( "Updated mean_%s = %s", own_exchange_var.name, mean_coup_value ) def _solve_local_optimization_debug( self, opt_inputs: Dict[str, AgentVariable], current_iteration: int, start_time: float, ) -> pd.DataFrame: """ USED FOR DEBUGGING, SKIPS CASADI Performs the local optimization and returns the result. Args: opt_inputs: dict with AgentVariables that stay constant between optimizations current_iteration: current iteration number start_time: environment time at start of ADMM algorithm Returns: DataFrame of all optimization variables. """ updated_couplings = self.collect_couplings_for_optimization() opt_inputs.update(updated_couplings) self.logger.info("Solving local optimization #%s.", current_iteration) self._status: ModuleStatus = ModuleStatus.optimizing grid = self.optimization_backend.coupling_grid result = {} for coup in self.config.couplings + self.config.controls + self.config.states: result[coup.name] = [coup.value] * len(grid) result = pd.DataFrame(result) self.logger.info("Solved local optimization #%s.", current_iteration) self.logger.debug( "Coupling variable #%s.", list(result[self.config.couplings[0].name]) ) return result
[docs] def send_coupling_variable(self, name: str, value: mpc_datamodels.MPCValue): """Sends an admm coupling variable through the data_broker and sets its value locally""" var = self._admm_variables[name] var.value = value self.agent.data_broker.send_variable(var)
[docs] def update_lambda(self): """ Performs the update of the lagrange multipliers. lambda^k+1 := lambda^k - rho*(z-x_i) """ self.logger.info("Updating lambda variables for all couplings") for coupling in self.var_ref.couplings: # Get current lambda value: coup_name = coupling.name lambda_coupling = self._admm_variables[coupling.multiplier].value lambda_coupling = np.array(lambda_coupling) self.logger.debug("Updating lambda_%s = %s", coup_name, lambda_coupling) own_coup_value = self._admm_variables[coupling.local].value own_coup_value = np.array(own_coup_value) mean_coup_value = self._admm_variables[coupling.mean].value mean_coup_value = np.array(mean_coup_value) # Calc update updated_value = lambda_coupling - self.config.penalty_factor * ( mean_coup_value - own_coup_value ) updated_value = updated_value.tolist() # Set value to data_broker self._admm_variables[coupling.multiplier].value = updated_value self.logger.info("Updated lambda_%s = %s", coupling.name, updated_value) for exchange in self.var_ref.exchange: # Get current lambda value: lambda_coupling = self._admm_variables[exchange.multiplier].value lambda_coupling = np.array(lambda_coupling) self.logger.debug("Updating lambda_%s = %s", exchange.name, lambda_coupling) own_coup_value = np.array(self._admm_variables[exchange.local].value) diff_coup_value = np.array(self._admm_variables[exchange.mean_diff].value) # Calc update updated_value = lambda_coupling - self.config.penalty_factor * ( diff_coup_value - own_coup_value ) updated_value = updated_value.tolist() # Set value to data_broker self._admm_variables[exchange.multiplier].value = updated_value self.logger.info("Updated lambda_%s = %s", exchange.name, updated_value)
[docs] def get_results(self) -> Optional[pd.DataFrame]: """Read the results that were saved from the optimization backend and returns them as Dataframe. Returns: (results, stats) tuple of Dataframes. """ results_file = self.optimization_backend.config.results_file if results_file is None: self.logger.info("No results were saved .") return None try: results, stats = self.read_results_file(results_file) return results except FileNotFoundError: self.logger.error("ADMM results file %s was not found.", results_file) return None
@property def penalty_factor_var(self) -> MPCVariable: return MPCVariable(name="penalty_factor", value=self.config.penalty_factor) def _setup_var_ref(self) -> adt.VariableReference: # Extend var_ref with coupling variables return adt.VariableReference.from_config(self.config) def _setup_optimization_backend(self) -> ADMMBackend: self._admm_variables = self._create_couplings() return super()._setup_optimization_backend() def _create_couplings(self) -> dict[str, MPCVariable]: """Map coupling variables based on already setup model""" # Check if coupling even exist # Map couplings: _couplings = [] # and generate new variables for admm: _admm_variables: dict[str, MPCVariable] = {} for coupling in self.config.couplings: coupling.source = Source(agent_id=self.agent.id) coupling.shared = True _couplings.append(coupling) # Create two new variables for each coupling: # 1. lambda variables in both cases. include = {"unit": coupling.unit, "description": coupling.description} coupling_entry = adt.CouplingEntry(name=coupling.name) alias = adt.coupling_alias(coupling.alias) _admm_variables[coupling_entry.multiplier] = MPCVariable( name=coupling_entry.multiplier, value=[0], type="list", source=Source(module_id=self.id), **include, ) _admm_variables[coupling_entry.local] = MPCVariable( name=coupling_entry.local, value=convert_to_list(coupling.value), alias=alias, type="list", source=Source(agent_id=self.agent.id), shared=True, **include, ) _admm_variables[coupling_entry.mean] = MPCVariable( name=coupling_entry.mean, type="list", source=Source(module_id=self.id), **include, ) lag_val = coupling.value or np.nan_to_num( (coupling.ub + coupling.lb) / 2, posinf=1000, neginf=1000 ) _admm_variables[coupling_entry.lagged] = MPCVariable( name=coupling_entry.lagged, value=lag_val, source=Source(module_id=self.id), **include, ) # add callback to receive this value broker_funcs = [ self.agent.data_broker.deregister_callback, self.agent.data_broker.register_callback, ] for broker_func in broker_funcs: broker_func( alias=alias, source=None, callback=self.participant_callback, ) self.registered_participants.update({alias: {}}) # Exchange variables _exchange_vars = [] # and generate new variables for admm: for exchange_var in self.config.exchange: exchange_var.source = Source(agent_id=self.agent.id) exchange_var.shared = True _exchange_vars.append(exchange_var) # Create two new variables for each coupling: # 1. lambda variables in both cases. include = { "unit": exchange_var.unit, "description": exchange_var.description, } exchange_entry = adt.ExchangeEntry(name=exchange_var.name) alias = adt.exchange_alias(exchange_var.alias) _admm_variables[exchange_entry.multiplier] = MPCVariable( name=exchange_entry.multiplier, value=[0], type="list", source=Source(module_id=self.id), **include, ) _admm_variables[exchange_entry.local] = MPCVariable( name=exchange_entry.local, value=convert_to_list(exchange_var.value), alias=alias, type="list", source=Source(agent_id=self.agent.id), shared=True, **include, ) _admm_variables[exchange_entry.mean_diff] = MPCVariable( name=exchange_entry.mean_diff, type="list", source=Source(module_id=self.id), **include, ) lag_val = exchange_var.value or np.nan_to_num( (exchange_var.ub + exchange_var.lb) / 2, posinf=1000, neginf=1000 ) _admm_variables[exchange_entry.lagged] = MPCVariable( name=exchange_entry.lagged, value=lag_val, source=Source(module_id=self.id), **include, ) # add callback to receive this value broker_funcs = [ self.agent.data_broker.deregister_callback, self.agent.data_broker.register_callback, ] for broker_func in broker_funcs: broker_func( alias=alias, source=None, callback=self.participant_callback, ) self.registered_participants.update({alias: {}}) return _admm_variables
[docs] def collect_variables_for_optimization( self, var_ref: mpc_datamodels.VariableReference = None ) -> dict[str, AgentVariable]: """Gets all variables noted in the var ref and puts them in a flat dictionary.""" if var_ref is None: var_ref = self.var_ref # config variables variables = {v: self.get(v) for v in var_ref.all_variables()} for coup_entry in var_ref.exchange + var_ref.couplings: lagged_admm_var = coup_entry.lagged original_name = coup_entry.name variable = self.get(original_name) if original_name in self.history: past_values = self.history[original_name] variable = MPCVariable( name=lagged_admm_var, value=pd.Series(past_values) ) variables[lagged_admm_var] = variable # history variables for hist_var in self._lags_dict_seconds: past_values = self.history[hist_var] if not past_values: # if the history of a variable is empty, fallback to the scalar value continue # create copy to not mess up scalar value of original variable in case # fallback is needed updated_var = variables[hist_var].copy( update={"value": pd.Series(past_values)} ) variables[hist_var] = updated_var return {**variables, **self._internal_variables}
[docs]class LocalADMMConfig(ADMMConfig): sync_delay: float = 0.001 registration_delay: float = 0.1
[docs]class LocalADMM(ADMM): config: LocalADMMConfig @property def sync_delay(self) -> float: """Timeout value used to sync local admm processes. Should be very small.""" return self.config.sync_delay @property def registration_delay(self) -> float: """Timeout value used to wait one on registration. Waits in real time (time.sleep)""" return self.config.registration_delay
[docs] def process(self): first_registration = True while True: start_round = self.env.time # Register participants in current round self.logger.info("Start registration of round at %s.", self.env.now) self._status = ModuleStatus.at_registration yield self.env.timeout(self.sync_delay) # shift initial values for multipliers and coupling outputs self._shift_and_send_coupling_outputs() self._shift_multipliers() self.pre_computation_hook() yield self.env.timeout(self.sync_delay) self._status = ModuleStatus.optimizing self.logger.info("Finished registration of round") yield self.env.timeout(self.sync_delay) if first_registration: time.sleep(self.registration_delay) first_registration = False # get optimization inputs self._set_mean_coupling_values() opt_inputs = self.collect_variables_for_optimization() # reset termination criteria start_iterations = self.env.time admm_iter = 0 # start the ADMM iteration loop while True: # Solve local optimization start_opt = time.time() updated_couplings = self.collect_couplings_for_optimization() opt_inputs.update(updated_couplings) self.logger.info("Solving local optimization #%s.", admm_iter) self._status = ModuleStatus.optimizing result = self.optimization_backend.solve(start_iterations, opt_inputs) self.logger.info("Solved local optimization #%s.", admm_iter) # admm coordination step yield self.env.timeout(self.sync_delay) self.send_coupling_values(result) yield self.env.timeout(self.sync_delay) self._status = ModuleStatus.waiting_for_other_agents self._receive_variables(start=start_opt) yield self.env.timeout(self.sync_delay) self._status = ModuleStatus.updating self._set_mean_coupling_values() self.update_lambda() self.reset_participants_ready() yield self.env.timeout(self.sync_delay) # check termination admm_iter += 1 if self._check_termination(admm_iter, start_iterations): break self.deregister_all_participants() self.set_actuation(result) self._status = ModuleStatus.sleeping time_spent_on_sync_delay = self.env.time - start_round yield self.env.timeout(self.config.time_step - time_spent_on_sync_delay)