Coverage for agentlib/utils/multi_agent_system.py: 62%

144 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2025-11-07 11:57 +0000

1""" 

2Module containing a local agency to test any LocalMASAgency system 

3without the need of cloneMAP. 

4""" 

5 

6import abc 

7import json 

8import logging 

9import multiprocessing 

10import threading 

11from pathlib import Path 

12from typing import List, Dict, Union, Any 

13 

14import pandas as pd 

15from pydantic import ( 

16 field_validator, 

17 ConfigDict, 

18 BaseModel, 

19 PrivateAttr, 

20 Field, 

21 FilePath, 

22) 

23 

24from agentlib.core import Agent, Environment 

25from agentlib.core.agent import AgentConfig 

26from agentlib.utils.load_config import load_config 

27 

28logger = logging.getLogger(__name__) 

29 

30 

31class MAS(BaseModel): 

32 """Parent class for all MAS""" 

33 

34 model_config = ConfigDict(arbitrary_types_allowed=True) 

35 

36 agent_configs: List[Union[dict, FilePath, str]] 

37 env: Union[Environment, dict, FilePath] = Field( 

38 default_factory=Environment, 

39 title="env", 

40 description="The environment for the agents.", 

41 ) 

42 variable_logging: bool = Field( 

43 default=False, 

44 title="variable_logging", 

45 description="Enable variable logging in all agents with sampling rate of environment.", 

46 ) 

47 use_direct_callback_databroker: bool = Field( 

48 default=False, 

49 description="If True, the `DirectCallbackDataBroker` will be used in all agents" 

50 ) 

51 _agent_configs: Dict[str, AgentConfig] = PrivateAttr(default={}) 

52 

53 def __init__(self, **data: Any) -> None: 

54 """Add all agents as Agent object""" 

55 super().__init__(**data) 

56 for agent_config in self.agent_configs: 

57 self.add_agent(config=agent_config) 

58 

59 @field_validator("agent_configs") 

60 @classmethod 

61 def setup_agents(cls, agent_configs): 

62 """Load agent configs and add them.""" 

63 cfgs = [] 

64 for cfg in agent_configs: 

65 cfgs.append(load_config(cfg, config_type=AgentConfig)) 

66 return cfgs 

67 

68 def add_agent(self, config: AgentConfig): 

69 """ 

70 Add an agent to the local agency with the 

71 given agent config. 

72 

73 Args: 

74 config Dict: agent config 

75 """ 

76 

77 if self.variable_logging: 

78 if isinstance(self.env, dict): 

79 config = self.add_agent_logger( 

80 config=config, sampling=self.env.get("t_sample", 60) 

81 ) 

82 else: 

83 config = self.add_agent_logger( 

84 config=config, sampling=self.env.config.t_sample 

85 ) 

86 if config.use_direct_callback_databroker and not self.use_direct_callback_databroker: 

87 logger.warning( 

88 "Agent %s explicitly sets use_direct_callback_databroker=True, " 

89 "won't apply the MAS.use_direct_callback_databroker=False setting.", 

90 config.id 

91 ) 

92 else: 

93 config.use_direct_callback_databroker = self.use_direct_callback_databroker 

94 

95 self._agent_configs[config.id] = config.model_copy() 

96 logger.info("Registered agent %s in agency", config.id) 

97 

98 @staticmethod 

99 def add_agent_logger(config: AgentConfig, sampling=60) -> AgentConfig: 

100 """Adds the AgentLogger to the list of configs. 

101 

102 Args: 

103 config dict: The config to be updated 

104 sampling= 

105 """ 

106 # Add Logger config 

107 filename = f"variable_logs//Agent_{config.id}_Logger.log" 

108 cfg = { 

109 "module_id": "AgentLogger", 

110 "type": "AgentLogger", 

111 "t_sample": sampling, 

112 "values_only": True, 

113 "filename": filename, 

114 "overwrite_log": True, 

115 "clean_up": False, 

116 } 

117 config.modules.append(cfg) 

118 return config 

119 

120 @abc.abstractmethod 

121 def run(self, until): 

122 """ 

123 Run the MAS. 

124 Args: 

125 until: The time until which the simulation should run. 

126 

127 Returns: 

128 

129 """ 

130 raise NotImplementedError("'run' is not implemented by the parent class MAS.") 

131 

132 

133class LocalMASAgency(MAS): 

134 """ 

135 Local LocalMASAgency agency class which holds the agents in a common environment, 

136 executes and terminates them. 

137 """ 

138 

139 _agents: Dict[str, Agent] = PrivateAttr(default={}) 

140 

141 @field_validator("env") 

142 @classmethod 

143 def setup_env(cls, env): 

144 """Setup the env if a config is given.""" 

145 if isinstance(env, Environment): 

146 return env 

147 if isinstance(env, (Path, str)): 

148 if Path(env).exists(): 

149 with open(env, "r") as f: 

150 env = json.load(f) 

151 return Environment(config=env) 

152 

153 def add_agent(self, config: AgentConfig): 

154 """Also setup the agent directly""" 

155 super().add_agent(config=config) 

156 self.setup_agent(id=config.id) 

157 

158 def stop_agency(self): 

159 """Stop all threads""" 

160 logger.info("Stopping agency") 

161 self.terminate_agents() 

162 

163 def run(self, until): 

164 """Execute the LocalMASAgency and terminate it after run is finished""" 

165 self.env.run(until=until) 

166 self.stop_agency() 

167 

168 def __enter__(self): 

169 """Enable 'with' statement""" 

170 return self 

171 

172 def __exit__(self, exc_type, exc_val, exc_tb): 

173 """On exit in 'with' statement, stop the agency""" 

174 self.stop_agency() 

175 

176 def terminate_agents(self): 

177 """Terminate all agents modules.""" 

178 logger.info("Terminating all agent modules") 

179 for agent in self._agents.values(): 

180 agent.terminate() 

181 

182 def setup_agent(self, id: str) -> Agent: 

183 """Setup the agent matching the given id""" 

184 # pylint: disable=redefined-builtin 

185 agent = Agent(env=self.env, config=self._agent_configs[id]) 

186 self._agents[agent.id] = agent 

187 return agent 

188 

189 def get_agent(self, id: str) -> Agent: 

190 """Get the agent matching the given id""" 

191 # pylint: disable=redefined-builtin, inconsistent-return-statements 

192 try: 

193 return self._agents[id] 

194 except KeyError: 

195 KeyError(f"Given id '{id}' is not in the set of agents.") 

196 

197 def get_results(self, cleanup: bool = False) -> Dict[str, pd.DataFrame]: 

198 """ 

199 Get all results of the agentLogger 

200 Args: 

201 cleanup: If true, read files are deleted. 

202 

203 Returns: 

204 Dict[str, pd.DataFrame]: key is the agent_id, value the dataframe 

205 """ 

206 results = {} 

207 for agent in self._agents.values(): 

208 new_res = agent.get_results(cleanup=cleanup) 

209 results[agent.id] = new_res 

210 return results 

211 

212 

213class LocalCloneMAPAgency(LocalMASAgency): 

214 """ 

215 Local LocalMASAgency agency class which tries to mimic cloneMAP 

216 behaviour for the local execution. 

217 """ 

218 

219 # todo-fwu delete or add to clonemap example. But I dont think we need the threads, since we have simpy 

220 

221 def run(self, until=None): 

222 pass # Already running 

223 

224 def terminate_agents(self): 

225 """Terminate all agents modules.""" 

226 logger.info("Can't terminate agents yet in this MAS") 

227 

228 def setup_agent(self, id: str): 

229 """Setup the agent matching the given id""" 

230 

231 # pylint: disable=redefined-builtin 

232 def _get_ag(env, ag_config): 

233 ag = Agent(env=Environment(config=env), config=ag_config) 

234 ag.env.run() 

235 return ag 

236 

237 thread = threading.Thread( 

238 target=_get_ag, 

239 kwargs={ 

240 "env": self.env.config.model_copy(), 

241 "ag_config": self._agent_configs[id].copy(), 

242 }, 

243 ) 

244 thread.start() 

245 self._agents[id] = thread 

246 

247 

248def agent_process( 

249 agent_config: Union[dict, FilePath], 

250 until: float, 

251 env: Union[dict, FilePath], 

252 results_dict: dict, 

253 cleanup=True, 

254 log_level=logging.ERROR, 

255): 

256 """ 

257 Function to initialize and start an agent in its own process. 

258 Collects results from the agent and stores them 

259 in the passed results_dict. 

260 Args: 

261 cleanup: 

262 agent_config: Config for an agent. 

263 until: Simulation runtime 

264 env: config for an environment 

265 results_dict: dict from process manager 

266 log_level: the log level for this process 

267 

268 Returns: 

269 

270 """ 

271 logging.basicConfig(level=log_level) 

272 env = Environment(config=env) 

273 agent = Agent(config=agent_config, env=env) 

274 agent.env.run(until=until) 

275 results = agent.get_results(cleanup) 

276 for mod in agent.modules: 

277 mod.terminate() 

278 results_dict[agent.id] = results 

279 

280 

281class MultiProcessingMAS(MAS): 

282 """ 

283 Helper class to conveniently run multi-agent-systems in separate processes. 

284 """ 

285 

286 env: Union[dict, FilePath] = Field( 

287 default_factory=lambda: Environment(config={"rt": True}), 

288 title="env", 

289 description="The environment for the agents.", 

290 ) 

291 cleanup: bool = Field( 

292 default=False, 

293 description="Whether agents should clean the results files after " "running.", 

294 ) 

295 log_level: int = Field( 

296 default=logging.ERROR, description="Loglevel to set for the processes." 

297 ) 

298 

299 _processes: List[multiprocessing.Process] = PrivateAttr(default=[]) 

300 _results_dict: Dict[str, pd.DataFrame] = PrivateAttr(default={}) 

301 

302 @field_validator("env") 

303 @classmethod 

304 def setup_env(cls, env): 

305 """Setup the env if a config is given.""" 

306 if isinstance(env, Environment): 

307 env = env.config.model_dump() 

308 elif isinstance(env, (Path, str)): 

309 if Path(env).exists(): 

310 with open(env, "r") as f: 

311 env = json.load(f) 

312 assert env.setdefault("rt", True), ( 

313 "Synchronization between processes relies on time, RealTimeEnvironment " 

314 "is required." 

315 ) 

316 return env 

317 

318 def __init__(self, **data: Any) -> None: 

319 super().__init__(**data) 

320 manager = multiprocessing.Manager() 

321 self._results_dict = manager.dict() 

322 

323 def run(self, until): 

324 """Execute the multi-agent-system in parallel and terminate it after 

325 run is finished""" 

326 for agent in self._agent_configs.values(): 

327 kwargs = { 

328 "agent_config": agent, 

329 "until": until, 

330 "env": self.env, 

331 "results_dict": self._results_dict, 

332 "cleanup": self.cleanup, 

333 "log_level": self.log_level, 

334 } 

335 process = multiprocessing.Process( 

336 target=agent_process, name=agent.id, kwargs=kwargs 

337 ) 

338 self._processes.append(process) 

339 process.start() 

340 for process in self._processes: 

341 process.join() 

342 

343 def get_results(self) -> Dict[str, pd.DataFrame]: 

344 """ 

345 Get all results of the agentLogger 

346 Returns: 

347 Dict[str, pd.DataFrame]: key is the agent_id, value the dataframe 

348 """ 

349 return dict(self._results_dict)