agentlib_mpc.modules.dmpc package

Subpackages

Submodules

agentlib_mpc.modules.dmpc.coordinator module

class agentlib_mpc.modules.dmpc.coordinator.Coordinator(*, config: dict, agent: Agent)[source]

Bases: BaseModule

Class implementing the base coordination for distributed MPC

property all_finished

Returns: True, if there are no busy agents, else False

init_iteration_callback(variable: AgentVariable)[source]

Processes and Agents InitIteration confirmation. :param variable:

Returns:

optim_results_callback(variable: AgentVariable)[source]

Saves the results of a local optimization. :param variable:

Returns:

process()[source]

This abstract method must be implemented in order to sync the module with the other processes of the agent and the whole MAS.

register_callbacks()[source]
registration_callback(variable: AgentVariable)[source]
trigger_optimizations()[source]

Triggers the optimization for all agents with status ready. Returns:

pydantic model agentlib_mpc.modules.dmpc.coordinator.CoordinatorConfig[source]

Bases: BaseModuleConfig

Config:
  • arbitrary_types_allowed: bool = True

  • validate_assignment: bool = True

  • extra: str = forbid

  • frozen: bool = True

Fields:
Validators:
field maxIter: int = 10

Maximum number of iterations

field messages_in: List[AgentVariable] = [AgentVariable(name='registration_agent_to_coordinator', type=None, timestamp=None, unit='Not defined', description='Not defined', ub=inf, lb=-inf, clip=False, allowed_values=[], value=None, alias='registration_agent_to_coordinator', source=Source(agent_id=None, module_id=None), shared=None, rdf_class=None), AgentVariable(name='startIteration_agent_to_coordinator', type=None, timestamp=None, unit='Not defined', description='Not defined', ub=inf, lb=-inf, clip=False, allowed_values=[], value=None, alias='startIteration_agent_to_coordinator', source=Source(agent_id=None, module_id=None), shared=None, rdf_class=None), AgentVariable(name='optimization_agent_to_coordinator', type=None, timestamp=None, unit='Not defined', description='Not defined', ub=inf, lb=-inf, clip=False, allowed_values=[], value=None, alias='optimization_agent_to_coordinator', source=Source(agent_id=None, module_id=None), shared=None, rdf_class=None)]
field messages_out: List[AgentVariable] = [AgentVariable(name='registration_coordinator_to_agent', type=None, timestamp=None, unit='Not defined', description='Not defined', ub=inf, lb=-inf, clip=False, allowed_values=[], value=None, alias='registration_coordinator_to_agent', source=Source(agent_id=None, module_id=None), shared=None, rdf_class=None), AgentVariable(name='startIteration_coordinator_to_agent', type=None, timestamp=None, unit='Not defined', description='Not defined', ub=inf, lb=-inf, clip=False, allowed_values=[], value=None, alias='startIteration_coordinator_to_agent', source=Source(agent_id=None, module_id=None), shared=None, rdf_class=None), AgentVariable(name='optimization_coordinator_to_agent', type=None, timestamp=None, unit='Not defined', description='Not defined', ub=inf, lb=-inf, clip=False, allowed_values=[], value=None, alias='optimization_coordinator_to_agent', source=Source(agent_id=None, module_id=None), shared=None, rdf_class=None)]
field shared_variable_fields: list[str] = ['messages_out']
Validated by:
  • check_valid_fields

field time_out_non_responders: float = 1

Maximum wait time for subsystems in seconds

model_post_init(context: Any, /) None

We need to both initialize private attributes and call the user-defined model_post_init method.

agentlib_mpc.modules.dmpc.employee module

class agentlib_mpc.modules.dmpc.employee.MiniEmployee(*, config: dict, agent: Agent)[source]

Bases: BaseModule

get_new_measurement()[source]

Retrieve new measurement from relevant sensors Returns:

init_iteration_callback(variable: AgentVariable)[source]

Callback that processes the coordinators ‘startIteration’ flag. :param variable:

abstract optimize(variable: AgentVariable)[source]

Performs the optimization given the information from the coordinator. Replies with local information. Returns:

pre_computation_hook()[source]

This method is called in every computation step before the optimization starts. Overwrite this method in a derived subclass if you want to take some actions each time before the optimal control problem is solved.

process()[source]

This abstract method must be implemented in order to sync the module with the other processes of the agent and the whole MAS.

register_callbacks()[source]
abstract registration_callback(variable: AgentVariable)[source]

callback for registration

shift_trajectories()[source]

Shifts algorithm specific trajectories. Returns:

pydantic model agentlib_mpc.modules.dmpc.employee.MiniEmployeeConfig[source]

Bases: BaseModuleConfig

Config:
  • arbitrary_types_allowed: bool = True

  • validate_assignment: bool = True

  • extra: str = forbid

  • frozen: bool = True

Fields:
Validators:
field coordinator: Source [Required]

Define the agents coordinator

field messages_in: List[AgentVariable] = [AgentVariable(name='registration_coordinator_to_agent', type=None, timestamp=None, unit='Not defined', description='Not defined', ub=inf, lb=-inf, clip=False, allowed_values=[], value=None, alias='registration_coordinator_to_agent', source=Source(agent_id=None, module_id=None), shared=None, rdf_class=None), AgentVariable(name='startIteration_coordinator_to_agent', type=None, timestamp=None, unit='Not defined', description='Not defined', ub=inf, lb=-inf, clip=False, allowed_values=[], value=None, alias='startIteration_coordinator_to_agent', source=Source(agent_id=None, module_id=None), shared=None, rdf_class=None), AgentVariable(name='optimization_coordinator_to_agent', type=None, timestamp=None, unit='Not defined', description='Not defined', ub=inf, lb=-inf, clip=False, allowed_values=[], value=None, alias='optimization_coordinator_to_agent', source=Source(agent_id=None, module_id=None), shared=None, rdf_class=None)]
field messages_out: List[AgentVariable] = [AgentVariable(name='registration_agent_to_coordinator', type=None, timestamp=None, unit='Not defined', description='Not defined', ub=inf, lb=-inf, clip=False, allowed_values=[], value=None, alias='registration_agent_to_coordinator', source=Source(agent_id=None, module_id=None), shared=None, rdf_class=None), AgentVariable(name='startIteration_agent_to_coordinator', type=None, timestamp=None, unit='Not defined', description='Not defined', ub=inf, lb=-inf, clip=False, allowed_values=[], value=None, alias='startIteration_agent_to_coordinator', source=Source(agent_id=None, module_id=None), shared=None, rdf_class=None), AgentVariable(name='optimization_agent_to_coordinator', type=None, timestamp=None, unit='Not defined', description='Not defined', ub=inf, lb=-inf, clip=False, allowed_values=[], value=None, alias='optimization_agent_to_coordinator', source=Source(agent_id=None, module_id=None), shared=None, rdf_class=None)]
field registration_interval: float = 10

Interval in seconds after which a registration attempt is made.

Constraints:
  • ge = 0

field request_frequency: float = 1

Wait time between signup_requests

field shared_variable_fields: list[str] = ['messages_out']
Validated by:
  • check_valid_fields

model_post_init(context: Any, /) None

We need to both initialize private attributes and call the user-defined model_post_init method.

Module contents

class agentlib_mpc.modules.dmpc.DistributedMPC(config: dict, agent: Agent)[source]

Bases: MPC

Base class which defines common interfaces among all distributed mpc approaches (either optimization based, game theory based or some other).

pydantic model agentlib_mpc.modules.dmpc.DistributedMPCConfig[source]

Bases: MPCConfig

Base config class with common configurations

Config:
  • arbitrary_types_allowed: bool = True

  • validate_assignment: bool = True

  • extra: str = forbid

  • frozen: bool = True

Fields:

Validators:

model_post_init(context: Any, /) None

We need to both initialize private attributes and call the user-defined model_post_init method.