Coverage for agentlib/utils/local_broadcast_broker.py: 56%
9 statements
« prev ^ index » next coverage.py v7.4.4, created at 2025-04-07 16:27 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2025-04-07 16:27 +0000
1"""
2Module containing a LocalBroadcastBroker that
3enables local broadcast communication.
4"""
6from typing import Union
8from agentlib.utils.broker import Broker
9from agentlib.core import AgentVariable
12class LocalBroadcastBroker(Broker):
13 """Local broadcast broker class which broadcasts messages"""
15 def broadcast(self, agent_id: str, message: Union[bytes, AgentVariable]):
16 """Broadcast message object to all agents but itself"""
18 # lock is required so the clients loop does not change size during
19 # iteration if clients are added or removed
20 with self.lock:
21 for client in list(self._clients):
22 if client.source.agent_id != agent_id:
23 client.receive(message)