Coverage for agentlib/modules/communicator/local_broadcast.py: 0%
13 statements
« prev ^ index » next coverage.py v7.4.4, created at 2025-04-30 13:00 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2025-04-30 13:00 +0000
1from agentlib.core.datamodels import AgentVariable
2from agentlib.modules.communicator.communicator import (
3 CommunicationDict,
4 LocalCommunicatorConfig,
5 LocalCommunicator,
6)
7from agentlib.utils import LocalBroadcastBroker
10class LocalBroadcastClient(LocalCommunicator):
11 """
12 This communicator implements the communication between agents via a
13 broadcast broker central process.
14 Note: The broker is implemented as singleton. This means that all agents must
15 be in the same process!
16 """
18 broker: LocalBroadcastBroker
19 config: LocalCommunicatorConfig
21 def setup_broker(self):
22 """Use the LocalBroadCastBroker"""
23 return LocalBroadcastBroker()
25 def _send(self, payload: CommunicationDict):
26 if self.config.parse_json:
27 self.broker.broadcast(payload["source"], self.to_json(payload))
28 else:
29 payload["name"] = payload["alias"]
30 self.broker.broadcast(payload["source"], AgentVariable(**payload))