Coverage for agentlib/modules/communicator/local.py: 71%
17 statements
« prev ^ index » next coverage.py v7.4.4, created at 2025-04-07 16:27 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2025-04-07 16:27 +0000
1from agentlib.core.datamodels import AgentVariable
2from agentlib.modules.communicator.communicator import (
3 CommunicationDict,
4 LocalCommunicator,
5 LocalCommunicatorConfig,
6 SubscriptionCommunicatorConfig,
7)
8from agentlib.utils import LocalBroker
11class LocalSubscriptionCommunicatorConfig(
12 LocalCommunicatorConfig, SubscriptionCommunicatorConfig
13): ...
16class LocalClient(LocalCommunicator):
17 """
18 This communicator implements the communication between agents via a
19 central process broker.
20 Note: The broker is implemented as singleton this means that all agents must
21 be in the same process!
23 """
25 config: LocalSubscriptionCommunicatorConfig
26 broker: LocalBroker
28 def setup_broker(self):
29 """Use the LocalBroker"""
30 return LocalBroker()
32 @property
33 def subscriptions(self):
34 return self.config.subscriptions
36 def _send(self, payload: CommunicationDict):
37 if self.config.parse_json:
38 self.broker.send(agent_id=payload["source"], message=self.to_json(payload))
39 else:
40 # we have to create the AgentVariable new here, because we modified the
41 # source, and don't want to modify the original variable
42 payload["name"] = payload["alias"]
43 self.broker.send(
44 agent_id=payload["source"], message=AgentVariable(**payload)
45 )