Coverage for agentlib/utils/local_broker.py: 56%
9 statements
« prev ^ index » next coverage.py v7.4.4, created at 2025-04-07 16:27 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2025-04-07 16:27 +0000
1"""
2Module containing a LocalBroker that
3enables local communication with subscriptions.
4"""
6from typing import Union
8from .broker import Broker
9from agentlib.core import datamodels
12class LocalBroker(Broker):
13 """Local broker class which sends messages"""
15 def send(self, agent_id: str, message: Union[bytes, datamodels.AgentVariable]):
16 """
17 Send the given message to all clients if the source
18 matches.
19 Args:
20 agent_id: Source to match
21 message: The message to send
23 Returns:
25 """
26 for client in self._clients:
27 for sub in client.subscriptions:
28 if sub == agent_id:
29 client.receive(message)