Coverage for agentlib/modules/communicator/communicator.py: 71%

112 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2025-04-07 16:27 +0000

1""" 

2Module contains basics communicator modules 

3""" 

4 

5import abc 

6import json 

7import queue 

8import threading 

9from typing import Union, List, TypedDict, Any 

10 

11import pandas as pd 

12from pydantic import Field, field_validator 

13 

14from agentlib.core import Agent, BaseModule, BaseModuleConfig 

15from agentlib.core.datamodels import AgentVariable 

16from agentlib.core.errors import OptionalDependencyError 

17from agentlib.utils.broker import Broker 

18from agentlib.utils.validators import convert_to_list 

19 

20 

21class CommunicationDict(TypedDict): 

22 alias: str 

23 value: Any 

24 timestamp: float 

25 type: str 

26 source: str 

27 

28 

29class CommunicatorConfig(BaseModuleConfig): 

30 use_orjson: bool = Field( 

31 title="Use orjson", 

32 default=False, 

33 description="If true, the faster orjson library will be used for serialization " 

34 "deserialization. Requires the optional dependency.", 

35 ) 

36 

37 

38class SubscriptionCommunicatorConfig(CommunicatorConfig): 

39 subscriptions: Union[List[str], str] = Field( 

40 title="Subscriptions", 

41 default=[], 

42 description="List of agent-id strings to subscribe to", 

43 ) 

44 check_subscriptions = field_validator("subscriptions")(convert_to_list) 

45 

46 

47class Communicator(BaseModule): 

48 """ 

49 Base class for all communicators 

50 """ 

51 

52 config: CommunicatorConfig 

53 

54 def __init__(self, *, config: dict, agent: Agent): 

55 super().__init__(config=config, agent=agent) 

56 

57 if self.config.use_orjson: 

58 try: 

59 import orjson 

60 except ImportError: 

61 raise OptionalDependencyError( 

62 dependency_name="orjson", 

63 dependency_install="orjson", 

64 used_object="Communicator with 'use_orjson=True'", 

65 ) 

66 

67 def _to_orjson(payload: CommunicationDict) -> bytes: 

68 return orjson.dumps(payload, option=orjson.OPT_SERIALIZE_NUMPY) 

69 

70 self.to_json = _to_orjson 

71 else: 

72 

73 def _to_json_builtin(payload: CommunicationDict) -> str: 

74 return json.dumps(payload) 

75 

76 self.to_json = _to_json_builtin 

77 

78 def register_callbacks(self): 

79 """Register all outputs to the callback function""" 

80 self.agent.data_broker.register_callback( 

81 callback=self._send_only_shared_variables, _unsafe_no_copy=True 

82 ) 

83 

84 def process(self): 

85 yield self.env.event() 

86 

87 def _send_only_shared_variables(self, variable: AgentVariable): 

88 """Send only variables with field ``shared=True``""" 

89 if not self._variable_can_be_send(variable): 

90 return 

91 

92 payload = self.short_dict(variable) 

93 self.logger.debug("Sending variable %s=%s", variable.alias, variable.value) 

94 self._send(payload=payload) 

95 

96 def _variable_can_be_send(self, variable): 

97 return variable.shared and ( 

98 (variable.source.agent_id is None) 

99 or (variable.source.agent_id == self.agent.id) 

100 ) 

101 

102 @abc.abstractmethod 

103 def _send(self, payload: CommunicationDict): 

104 raise NotImplementedError( 

105 "This method needs to be implemented " "individually for each communicator" 

106 ) 

107 

108 def short_dict(self, variable: AgentVariable) -> CommunicationDict: 

109 """Creates a short dict serialization of the Variable. 

110 

111 Only contains attributes of the AgentVariable, that are relevant for other 

112 modules or agents. For performance and privacy reasons, this function should 

113 be called for communicators.""" 

114 if isinstance(variable.value, pd.Series): 

115 value = variable.value.to_json() 

116 else: 

117 value = variable.value 

118 return CommunicationDict( 

119 alias=variable.alias, 

120 value=value, 

121 timestamp=variable.timestamp, 

122 type=variable.type, 

123 source=self.agent.id, 

124 ) 

125 

126 def to_json(self, payload: CommunicationDict) -> Union[bytes, str]: 

127 """Transforms the payload into json serialized form. Dynamically uses orjson 

128 if it is installed, and the builtin json otherwise. 

129 

130 Returns bytes or str depending on the library used, but this has not mattered 

131 with the communicators as of now. 

132 """ 

133 # implemented on init 

134 pass 

135 

136 

137class LocalCommunicatorConfig(CommunicatorConfig): 

138 parse_json: bool = Field( 

139 title="Indicate whether variables are converted to json before sending. " 

140 "Increasing computing time but makes MAS more close to later stages" 

141 "which use MQTT or similar.", 

142 default=False, 

143 ) 

144 

145 

146class LocalCommunicator(Communicator): 

147 """ 

148 Base class for local communicators. 

149 """ 

150 

151 config: LocalCommunicatorConfig 

152 

153 def __init__(self, config: dict, agent: Agent): 

154 # assign methods to receive messages either in realtime or in the 

155 # simpy process. Has to be done before calling super().__init__() 

156 # because that already calls the process method 

157 if agent.env.config.rt: 

158 self.process = self._process_realtime 

159 self.receive = self._receive_realtime 

160 self._loop = None 

161 else: 

162 self._received_variable = agent.env.event() 

163 self.process = self._process 

164 self.receive = self._receive 

165 

166 super().__init__(config=config, agent=agent) 

167 self.broker = self.setup_broker() 

168 self._msg_q_in = queue.Queue(100) 

169 self.broker.register_client(client=self) 

170 

171 @property 

172 def broker(self) -> Broker: 

173 """Broker used by LocalCommunicator""" 

174 return self._broker 

175 

176 @broker.setter 

177 def broker(self, broker): 

178 """Set the broker of the LocalCommunicator""" 

179 self._broker = broker 

180 self.logger.info("%s uses broker %s", self.__class__.__name__, self.broker) 

181 

182 @abc.abstractmethod 

183 def setup_broker(self): 

184 """Function to set up the broker object. 

185 Needs to return a valid broker option.""" 

186 raise NotImplementedError( 

187 "This method needs to be implemented " "individually for each communicator" 

188 ) 

189 

190 def _process(self): 

191 """Waits for new messages, sends them to the broker.""" 

192 yield self.env.event() 

193 

194 def _process_realtime(self): 

195 """Only start the loop once the env is running.""" 

196 self._loop = threading.Thread( 

197 target=self._message_handler, name=str(self.source) 

198 ) 

199 self._loop.daemon = True # Necessary to enable terminations of scripts 

200 self._loop.start() 

201 self.agent.register_thread(thread=self._loop) 

202 yield self.env.event() 

203 

204 def _send_simpy(self, ignored): 

205 """Sends new messages to the broker when receiving them, adhering to the 

206 simpy event queue. To be appended to a simpy event callback.""" 

207 variable = self._msg_q_in.get_nowait() 

208 self.agent.data_broker.send_variable(variable) 

209 

210 def _receive(self, msg_obj): 

211 """Receive a given message and put it in the queue and set the 

212 corresponding simpy event.""" 

213 if self.config.parse_json: 

214 variable = AgentVariable.from_json(msg_obj) 

215 else: 

216 variable = msg_obj 

217 self._msg_q_in.put(variable, block=False) 

218 self._received_variable.callbacks.append(self._send_simpy) 

219 self._received_variable.succeed() 

220 self._received_variable = self.env.event() 

221 

222 def _receive_realtime(self, msg_obj): 

223 """Receive a given message and put it in the queue. No event setting 

224 is required for realtime.""" 

225 if self.config.parse_json: 

226 variable = AgentVariable.from_json(msg_obj) 

227 else: 

228 variable = msg_obj 

229 self._msg_q_in.put(variable) 

230 

231 def _message_handler(self): 

232 """Reads messages that were put in the message queue.""" 

233 while True: 

234 variable = self._msg_q_in.get() 

235 self.agent.data_broker.send_variable(variable) 

236 

237 def terminate(self): 

238 # Terminating is important when running multiple 

239 # simulations/environments, otherwise the broker will keep spamming all 

240 # agents from the previous simulation, potentially filling their queues. 

241 self.broker.delete_client(self) 

242 super().terminate()