Source code for agentlib.utils.local_broadcast_broker
"""Module containing a LocalBroadcastBroker thatenables local broadcast communication."""fromtypingimportUnionfromagentlib.utils.brokerimportBrokerfromagentlib.coreimportAgentVariable
[docs]classLocalBroadcastBroker(Broker):"""Local broadcast broker class which broadcasts messages"""
[docs]defbroadcast(self,agent_id:str,message:Union[bytes,AgentVariable]):"""Broadcast message object to all agents but itself"""# lock is required so the clients loop does not change size during# iteration if clients are added or removedwithself.lock:forclientinlist(self._clients):ifclient.source.agent_id!=agent_id:client.receive(message)