Coverage for agentlib/modules/communicator/local_multiprocessing.py: 96%

55 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2025-04-07 16:27 +0000

1import time 

2import multiprocessing 

3import threading 

4 

5from pydantic import Field 

6from ipaddress import IPv4Address 

7 

8from agentlib.core import Agent 

9from agentlib.core.datamodels import AgentVariable 

10from agentlib.modules.communicator.communicator import ( 

11 Communicator, 

12 CommunicationDict, 

13 CommunicatorConfig, 

14) 

15from agentlib.utils import multi_processing_broker 

16 

17 

18class MultiProcessingBroadcastClientConfig(CommunicatorConfig): 

19 ipv4: IPv4Address = Field( 

20 default="127.0.0.1", 

21 description="IP Address for the communication server. Defaults to localhost.", 

22 ) 

23 port: int = Field( 

24 default=50_000, 

25 description="Port for setting up the connection with the broker.", 

26 ) 

27 authkey: bytes = Field( 

28 default=b"useTheAgentlib", 

29 description="Authorization key for the connection with the broker.", 

30 ) 

31 

32 

33class MultiProcessingBroadcastClient(Communicator): 

34 """ 

35 This communicator implements the communication between agents via a 

36 central process broker. 

37 """ 

38 

39 config: MultiProcessingBroadcastClientConfig 

40 

41 def __init__(self, config: dict, agent: Agent): 

42 super().__init__(config=config, agent=agent) 

43 manager = multi_processing_broker.BrokerManager( 

44 address=(self.config.ipv4, self.config.port), authkey=self.config.authkey 

45 ) 

46 started_wait = time.time() 

47 

48 while True: 

49 try: 

50 manager.connect() 

51 break 

52 except ConnectionRefusedError: 

53 time.sleep(0.01) 

54 if time.time() - started_wait > 10: 

55 raise RuntimeError("Could not connect to the server.") 

56 

57 signup_queue = manager.get_queue() 

58 client_read, broker_write = multiprocessing.Pipe(duplex=False) 

59 broker_read, client_write = multiprocessing.Pipe(duplex=False) 

60 signup = multi_processing_broker.MPClient( 

61 agent_id=self.agent.id, read=broker_read, write=broker_write 

62 ) 

63 

64 signup_queue.put(signup) 

65 

66 self._client_write = client_write 

67 self._broker_write = broker_write 

68 self._client_read = client_read 

69 self._broker_read = broker_read 

70 

71 # ensure the broker has set up the connection and sends its ack 

72 self._client_read.recv() 

73 

74 def process(self): 

75 """Only start the loop once the env is running.""" 

76 _thread = threading.Thread( 

77 target=self._message_handler, name=str(self.source), daemon=True 

78 ) 

79 _thread.start() 

80 self.agent.register_thread(thread=_thread) 

81 yield self.env.event() 

82 

83 def _message_handler(self): 

84 """Reads messages that were put in the message queue.""" 

85 while True: 

86 try: 

87 msg = self._client_read.recv() 

88 except EOFError: 

89 break 

90 variable = AgentVariable.from_json(msg) 

91 self.logger.debug(f"Received variable {variable.name}.") 

92 self.agent.data_broker.send_variable(variable) 

93 

94 def terminate(self): 

95 """Closes all of the connections.""" 

96 # Terminating is important when running multiple 

97 # simulations/environments, otherwise the broker will keep spamming all 

98 # agents from the previous simulation, potentially filling their queues. 

99 self._client_write.close() 

100 self._client_read.close() 

101 self._broker_write.close() 

102 self._broker_read.close() 

103 super().terminate() 

104 

105 def _send(self, payload: CommunicationDict): 

106 """Sends a variable to the Broker.""" 

107 agent_id = payload["source"] 

108 msg = multi_processing_broker.Message( 

109 agent_id=agent_id, payload=self.to_json(payload) 

110 ) 

111 self._client_write.send(msg)