Source code for agentlib_mpc.modules.dmpc.employee

import logging
from dataclasses import asdict
import abc

from pydantic import Field

from agentlib.core import (
from agentlib.core.datamodels import Source
from agentlib_mpc.data_structures.coordinator_datatypes import RegistrationMessage
import agentlib_mpc.data_structures.coordinator_datatypes as cdt

logger = logging.getLogger(__name__)

[docs]class MiniEmployeeConfig(BaseModuleConfig): request_frequency: float = Field( default=1, description="Wait time between signup_requests" ) coordinator: Source = Field(description="Define the agents coordinator") messages_in: AgentVariables = [ AgentVariable(name=cdt.REGISTRATION_C2A), AgentVariable(name=cdt.START_ITERATION_C2A), AgentVariable(name=cdt.OPTIMIZATION_C2A), ] messages_out: AgentVariables = [ AgentVariable(name=cdt.REGISTRATION_A2C), AgentVariable(name=cdt.START_ITERATION_A2C), AgentVariable(name=cdt.OPTIMIZATION_A2C), ] registration_interval: float = Field( default=10, ge=0, description="Interval in seconds after which a registration attempt is made.", ) shared_variable_fields: list[str] = ["messages_out"]
[docs]class MiniEmployee(BaseModule): config: MiniEmployeeConfig def __init__(self, *, config: dict, agent: Agent): super().__init__(config=config, agent=agent) self._registered_coordinator: Source = None self._start_optimization_at: float = 0
[docs] def process(self): # send registration request to coordinator timeout = self.config.registration_interval while True: if not self._registered_coordinator: self.set(cdt.REGISTRATION_A2C, True) yield self.env.timeout(timeout)
[docs] def register_callbacks(self): # callback used for registration process coordinator_agent = Source(agent_id=self.config.coordinator.agent_id) self.agent.data_broker.register_callback( alias=cdt.REGISTRATION_C2A, source=coordinator_agent, callback=self.registration_callback, ) # # call back for iteration start ( self.agent.data_broker.register_callback( alias=cdt.START_ITERATION_C2A, source=coordinator_agent, callback=self.init_iteration_callback, ) # # call back for optimization ( self.agent.data_broker.register_callback( alias=cdt.OPTIMIZATION_C2A, source=coordinator_agent, callback=self.optimize, )
[docs] def pre_computation_hook(self): """ This method is called in every computation step before the optimization starts. Overwrite this method in a derived subclass if you want to take some actions each time before the optimal control problem is solved. """ pass
[docs] def init_iteration_callback(self, variable: AgentVariable): """ Callback that processes the coordinators 'startIteration' flag. Args: variable: """ # value is True on start if variable.value: self._start_optimization_at = self.env.time # new measurement self.get_new_measurement() # shift trajectories self.shift_trajectories() # custom function which can be overloaded to do stuff before a step self.pre_computation_hook() self.set(cdt.START_ITERATION_A2C, True) self.logger.debug("Sent 'StartIteration' True.") # value is False on convergence/iteration limit else: self._finish_optimization()
[docs] def get_new_measurement(self): """ Retrieve new measurement from relevant sensors Returns: """ ...
# raise NotImplementedError # return self.collect_variables_for_optimization() @abc.abstractmethod def _finish_optimization(self): """ Finalize an iteration. Usually, this includes setting the actuation. Returns: """
[docs] @abc.abstractmethod def optimize(self, variable: AgentVariable): """ Performs the optimization given the information from the coordinator. Replies with local information. Returns: """ variables = cdt.OptimizationData.from_dict(variable.value) # perform optimization # send optimizationData back to coordinator to signal finished # optimization value = variables.to_dict() self.logger.debug("Sent optimal solution.") self.set(name=cdt.OPTIMIZATION_A2C, value=value)
[docs] def shift_trajectories(self): """ Shifts algorithm specific trajectories. Returns: """ ...
# raise NotImplementedError
[docs] @abc.abstractmethod def registration_callback(self, variable: AgentVariable): """callback for registration""" f"receiving {}={variable.value} from {variable.source}" ) # global parameters to define optimisation problem value = RegistrationMessage(**variable.value) # Decide if message from coordinator is for this agent if not (value.agent_id == self.source.agent_id): return self.OptimOpts = value.opts status = True answer = RegistrationMessage(status=cdt.AgentStatus.standby) self._registered_coordinator = variable.source if status: self.set("registrationOut", asdict(answer))