Source code for agentlib.modules.communicator.local

from agentlib.core.datamodels import AgentVariable
from agentlib.modules.communicator.communicator import (
    CommunicationDict,
    LocalCommunicator,
    LocalCommunicatorConfig,
    SubscriptionCommunicatorConfig,
)
from agentlib.utils import LocalBroker


[docs]class LocalSubscriptionCommunicatorConfig(
LocalCommunicatorConfig, SubscriptionCommunicatorConfig ): ...
[docs]class LocalClient(LocalCommunicator): """ This communicator implements the communication between agents via a central process broker. Note: The broker is implemented as singleton this means that all agents must be in the same process! """ config: LocalSubscriptionCommunicatorConfig broker: LocalBroker
[docs] def setup_broker(self): """Use the LocalBroker""" return LocalBroker()
@property def subscriptions(self): return self.config.subscriptions def _send(self, payload: CommunicationDict): if self.config.parse_json: self.broker.send(agent_id=payload["source"], message=self.to_json(payload)) else: # we have to create the AgentVariable new here, because we modified the # source, and don't want to modify the original variable payload["name"] = payload["alias"] self.broker.send( agent_id=payload["source"], message=AgentVariable(**payload) )