Coverage for agentlib/modules/communicator/local_multiprocessing.py: 96%
55 statements
« prev ^ index » next coverage.py v7.4.4, created at 2025-04-07 16:27 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2025-04-07 16:27 +0000
1import time
2import multiprocessing
3import threading
5from pydantic import Field
6from ipaddress import IPv4Address
8from agentlib.core import Agent
9from agentlib.core.datamodels import AgentVariable
10from agentlib.modules.communicator.communicator import (
11 Communicator,
12 CommunicationDict,
13 CommunicatorConfig,
14)
15from agentlib.utils import multi_processing_broker
18class MultiProcessingBroadcastClientConfig(CommunicatorConfig):
19 ipv4: IPv4Address = Field(
20 default="127.0.0.1",
21 description="IP Address for the communication server. Defaults to localhost.",
22 )
23 port: int = Field(
24 default=50_000,
25 description="Port for setting up the connection with the broker.",
26 )
27 authkey: bytes = Field(
28 default=b"useTheAgentlib",
29 description="Authorization key for the connection with the broker.",
30 )
33class MultiProcessingBroadcastClient(Communicator):
34 """
35 This communicator implements the communication between agents via a
36 central process broker.
37 """
39 config: MultiProcessingBroadcastClientConfig
41 def __init__(self, config: dict, agent: Agent):
42 super().__init__(config=config, agent=agent)
43 manager = multi_processing_broker.BrokerManager(
44 address=(self.config.ipv4, self.config.port), authkey=self.config.authkey
45 )
46 started_wait = time.time()
48 while True:
49 try:
50 manager.connect()
51 break
52 except ConnectionRefusedError:
53 time.sleep(0.01)
54 if time.time() - started_wait > 10:
55 raise RuntimeError("Could not connect to the server.")
57 signup_queue = manager.get_queue()
58 client_read, broker_write = multiprocessing.Pipe(duplex=False)
59 broker_read, client_write = multiprocessing.Pipe(duplex=False)
60 signup = multi_processing_broker.MPClient(
61 agent_id=self.agent.id, read=broker_read, write=broker_write
62 )
64 signup_queue.put(signup)
66 self._client_write = client_write
67 self._broker_write = broker_write
68 self._client_read = client_read
69 self._broker_read = broker_read
71 # ensure the broker has set up the connection and sends its ack
72 self._client_read.recv()
74 def process(self):
75 """Only start the loop once the env is running."""
76 _thread = threading.Thread(
77 target=self._message_handler, name=str(self.source), daemon=True
78 )
79 _thread.start()
80 self.agent.register_thread(thread=_thread)
81 yield self.env.event()
83 def _message_handler(self):
84 """Reads messages that were put in the message queue."""
85 while True:
86 try:
87 msg = self._client_read.recv()
88 except EOFError:
89 break
90 variable = AgentVariable.from_json(msg)
91 self.logger.debug(f"Received variable {variable.name}.")
92 self.agent.data_broker.send_variable(variable)
94 def terminate(self):
95 """Closes all of the connections."""
96 # Terminating is important when running multiple
97 # simulations/environments, otherwise the broker will keep spamming all
98 # agents from the previous simulation, potentially filling their queues.
99 self._client_write.close()
100 self._client_read.close()
101 self._broker_write.close()
102 self._broker_read.close()
103 super().terminate()
105 def _send(self, payload: CommunicationDict):
106 """Sends a variable to the Broker."""
107 agent_id = payload["source"]
108 msg = multi_processing_broker.Message(
109 agent_id=agent_id, payload=self.to_json(payload)
110 )
111 self._client_write.send(msg)