Coverage for agentlib/modules/communicator/local.py: 71%

17 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2025-04-07 16:27 +0000

1from agentlib.core.datamodels import AgentVariable 

2from agentlib.modules.communicator.communicator import ( 

3 CommunicationDict, 

4 LocalCommunicator, 

5 LocalCommunicatorConfig, 

6 SubscriptionCommunicatorConfig, 

7) 

8from agentlib.utils import LocalBroker 

9 

10 

11class LocalSubscriptionCommunicatorConfig( 

12 LocalCommunicatorConfig, SubscriptionCommunicatorConfig 

13): ... 

14 

15 

16class LocalClient(LocalCommunicator): 

17 """ 

18 This communicator implements the communication between agents via a 

19 central process broker. 

20 Note: The broker is implemented as singleton this means that all agents must 

21 be in the same process! 

22 

23 """ 

24 

25 config: LocalSubscriptionCommunicatorConfig 

26 broker: LocalBroker 

27 

28 def setup_broker(self): 

29 """Use the LocalBroker""" 

30 return LocalBroker() 

31 

32 @property 

33 def subscriptions(self): 

34 return self.config.subscriptions 

35 

36 def _send(self, payload: CommunicationDict): 

37 if self.config.parse_json: 

38 self.broker.send(agent_id=payload["source"], message=self.to_json(payload)) 

39 else: 

40 # we have to create the AgentVariable new here, because we modified the 

41 # source, and don't want to modify the original variable 

42 payload["name"] = payload["alias"] 

43 self.broker.send( 

44 agent_id=payload["source"], message=AgentVariable(**payload) 

45 )