Coverage for agentlib/utils/local_broker.py: 56%

9 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2025-04-07 16:27 +0000

1""" 

2Module containing a LocalBroker that 

3enables local communication with subscriptions. 

4""" 

5 

6from typing import Union 

7 

8from .broker import Broker 

9from agentlib.core import datamodels 

10 

11 

12class LocalBroker(Broker): 

13 """Local broker class which sends messages""" 

14 

15 def send(self, agent_id: str, message: Union[bytes, datamodels.AgentVariable]): 

16 """ 

17 Send the given message to all clients if the source 

18 matches. 

19 Args: 

20 agent_id: Source to match 

21 message: The message to send 

22 

23 Returns: 

24 

25 """ 

26 for client in self._clients: 

27 for sub in client.subscriptions: 

28 if sub == agent_id: 

29 client.receive(message)