Coverage for agentlib/utils/local_broadcast_broker.py: 56%

9 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2025-04-07 16:27 +0000

1""" 

2Module containing a LocalBroadcastBroker that 

3enables local broadcast communication. 

4""" 

5 

6from typing import Union 

7 

8from agentlib.utils.broker import Broker 

9from agentlib.core import AgentVariable 

10 

11 

12class LocalBroadcastBroker(Broker): 

13 """Local broadcast broker class which broadcasts messages""" 

14 

15 def broadcast(self, agent_id: str, message: Union[bytes, AgentVariable]): 

16 """Broadcast message object to all agents but itself""" 

17 

18 # lock is required so the clients loop does not change size during 

19 # iteration if clients are added or removed 

20 with self.lock: 

21 for client in list(self._clients): 

22 if client.source.agent_id != agent_id: 

23 client.receive(message)