Source code for agentlib.modules.communicator.local_multiprocessing

import time
import multiprocessing
import threading

from pydantic import Field
from ipaddress import IPv4Address

from agentlib.core import Agent
from agentlib.core.datamodels import AgentVariable
from agentlib.modules.communicator.communicator import (
    Communicator,
    CommunicationDict,
    CommunicatorConfig,
)
from agentlib.utils import multi_processing_broker


[docs]class MultiProcessingBroadcastClientConfig(CommunicatorConfig): ipv4: IPv4Address = Field( default="127.0.0.1", description="IP Address for the communication server. Defaults to localhost.", ) port: int = Field( default=50_000, description="Port for setting up the connection with the broker.", ) authkey: bytes = Field( default=b"useTheAgentlib", description="Authorization key for the connection with the broker.", )
[docs]class MultiProcessingBroadcastClient(Communicator): """ This communicator implements the communication between agents via a central process broker. """ config: MultiProcessingBroadcastClientConfig def __init__(self, config: dict, agent: Agent): super().__init__(config=config, agent=agent) manager = multi_processing_broker.BrokerManager( address=(self.config.ipv4, self.config.port), authkey=self.config.authkey ) started_wait = time.time() while True: try: manager.connect() break except ConnectionRefusedError: time.sleep(0.01) if time.time() - started_wait > 10: raise RuntimeError("Could not connect to the server.") signup_queue = manager.get_queue() client_read, broker_write = multiprocessing.Pipe(duplex=False) broker_read, client_write = multiprocessing.Pipe(duplex=False) signup = multi_processing_broker.MPClient( agent_id=self.agent.id, read=broker_read, write=broker_write ) signup_queue.put(signup) self._client_write = client_write self._broker_write = broker_write self._client_read = client_read self._broker_read = broker_read # ensure the broker has set up the connection and sends its ack self._client_read.recv()
[docs] def process(self): """Only start the loop once the env is running.""" _thread = threading.Thread( target=self._message_handler, name=str(self.source), daemon=True ) _thread.start() self.agent.register_thread(thread=_thread) yield self.env.event()
def _message_handler(self): """Reads messages that were put in the message queue.""" while True: try: msg = self._client_read.recv() except EOFError: break variable = AgentVariable.from_json(msg) self.logger.debug(f"Received variable {variable.name}.") self.agent.data_broker.send_variable(variable)
[docs] def terminate(self): """Closes all of the connections.""" # Terminating is important when running multiple # simulations/environments, otherwise the broker will keep spamming all # agents from the previous simulation, potentially filling their queues. self._client_write.close() self._client_read.close() self._broker_write.close() self._broker_read.close() super().terminate()
def _send(self, payload: CommunicationDict): """Sends a variable to the Broker.""" agent_id = payload["source"] msg = multi_processing_broker.Message( agent_id=agent_id, payload=self.to_json(payload) ) self._client_write.send(msg)