Source code for agentlib.utils.multi_processing_broker

"""
Module containing a MultiProcessingBroker that
enables communication across different processes.
"""

import json
import multiprocessing
from ipaddress import IPv4Address
from multiprocessing.managers import SyncManager
import threading
import time
from collections import namedtuple
from typing import Union
import logging

from pathlib import Path
from pydantic import BaseModel, Field, FilePath

from .broker import Broker

logger = logging.getLogger(__name__)


MPClient = namedtuple("MPClient", ["agent_id", "read", "write"])
Message = namedtuple("Message", ["agent_id", "payload"])


[docs]class BrokerManager(SyncManager): pass
BrokerManager.register("get_queue")
[docs]class MultiProcessingBrokerConfig(BaseModel): """Class describing the configuration options for the MultiProcessingBroker.""" ipv4: IPv4Address = Field( default="127.0.0.1", description="IP Address for the communication server. Defaults to localhost.", ) port: int = Field( default=50000, description="Port for setting up the connection with the server." ) authkey: bytes = Field( default=b"useTheAgentlib", description="Authorization key for the connection with the broker.", )
ConfigTypes = Union[MultiProcessingBrokerConfig, dict, str, FilePath]
[docs]class MultiProcessingBroker(Broker): """ Singleton which acts as a broker for distributed simulations among multiple local processes. Establishes a connection to a multiprocessing.Manager object, which defines a queue. This queue is used to receive connection requests from local clients. The clients send a Conn object (from multiprocessing.Pipe()) object through which the connection is established. For each connected client, a thread waits for incoming objects. """ def __init__(self, config: ConfigTypes = None): super().__init__() if config is None: self.config = MultiProcessingBrokerConfig() else: self.config = config server = multiprocessing.Process( target=self._server, name="Broker_Server", args=(self.config,), daemon=True ) server.start() signup_handler = threading.Thread( target=self._signup_handler, daemon=True, name="Broker_SignUp" ) signup_handler.start() @property def config(self) -> MultiProcessingBrokerConfig: """Return the config of the environment""" return self._config @config.setter def config(self, config: ConfigTypes): """Set the config/settings of the environment""" if isinstance(config, MultiProcessingBrokerConfig): self._config = config return elif isinstance(config, (str, Path)): if Path(config).exists(): with open(config, "r") as f: config = json.load(f) self._config = MultiProcessingBrokerConfig.model_validate(config) @staticmethod def _server(config: MultiProcessingBrokerConfig): """Creates the Manager object which owns the queue and lets it serve forever.""" from multiprocessing.managers import BaseManager from queue import Queue queue = Queue() class QueueManager(BaseManager): pass QueueManager.register("get_queue", callable=lambda: queue) m = QueueManager(address=(config.ipv4, config.port), authkey=config.authkey) s = m.get_server() s.serve_forever() def _signup_handler(self): """Connects to the manager queue and processes the signup requests. Starts a child thread listening to messages from each client.""" from multiprocessing.managers import BaseManager class QueueManager(BaseManager): pass QueueManager.register("get_queue") m = QueueManager( address=(self.config.ipv4, self.config.port), authkey=self.config.authkey ) started_wait = time.time() while True: try: m.connect() break except ConnectionRefusedError: time.sleep(0.01) if time.time() - started_wait > 10: raise RuntimeError("Could not connect to server.") signup_queue = m.get_queue() while True: try: client = signup_queue.get() except ConnectionResetError: logger.info("Multiprocessing Broker disconnected.") break with self.lock: self._clients.add(client) # send the client an ack its messages are now being received client.write.send(1) threading.Thread( target=self._client_loop, args=(client,), name=f"MPBroker_{client.agent_id}", daemon=True, ).start() def _client_loop(self, client: MPClient): """Receives messages from a client and redistributes them.""" while True: try: msg: Message = client.read.recv() except EOFError: with self.lock: self._clients.remove(client) break self.send(message=msg.payload, source=msg.agent_id)
[docs] def send(self, source, message): """ Send the given message to all clients if the source matches. Args: source: Source to match message: The message to send Returns: """ # lock is required so the clients loop does not change size during # iteration if clients are added or removed with self.lock: for client in list(self._clients): if client.agent_id != source: try: client.write.send(message) except BrokenPipeError: pass