Coverage for agentlib/modules/communicator/local_broadcast.py: 0%

13 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2025-04-30 13:00 +0000

1from agentlib.core.datamodels import AgentVariable 

2from agentlib.modules.communicator.communicator import ( 

3 CommunicationDict, 

4 LocalCommunicatorConfig, 

5 LocalCommunicator, 

6) 

7from agentlib.utils import LocalBroadcastBroker 

8 

9 

10class LocalBroadcastClient(LocalCommunicator): 

11 """ 

12 This communicator implements the communication between agents via a 

13 broadcast broker central process. 

14 Note: The broker is implemented as singleton. This means that all agents must 

15 be in the same process! 

16 """ 

17 

18 broker: LocalBroadcastBroker 

19 config: LocalCommunicatorConfig 

20 

21 def setup_broker(self): 

22 """Use the LocalBroadCastBroker""" 

23 return LocalBroadcastBroker() 

24 

25 def _send(self, payload: CommunicationDict): 

26 if self.config.parse_json: 

27 self.broker.broadcast(payload["source"], self.to_json(payload)) 

28 else: 

29 payload["name"] = payload["alias"] 

30 self.broker.broadcast(payload["source"], AgentVariable(**payload))