Coverage for agentlib/utils/multi_agent_system.py: 61%

140 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2025-04-07 16:27 +0000

1""" 

2Module containing a local agency to test any LocalMASAgency system 

3without the need of cloneMAP. 

4""" 

5 

6import abc 

7import json 

8import logging 

9import multiprocessing 

10import threading 

11from pathlib import Path 

12from typing import List, Dict, Union, Any 

13 

14import pandas as pd 

15from pydantic import ( 

16 field_validator, 

17 ConfigDict, 

18 BaseModel, 

19 PrivateAttr, 

20 Field, 

21 FilePath, 

22) 

23 

24from agentlib.core import Agent, Environment 

25from agentlib.core.agent import AgentConfig 

26from agentlib.utils.load_config import load_config 

27 

28logger = logging.getLogger(__name__) 

29 

30 

31class MAS(BaseModel): 

32 """Parent class for all MAS""" 

33 

34 model_config = ConfigDict(arbitrary_types_allowed=True) 

35 

36 agent_configs: List[Union[dict, FilePath, str]] 

37 env: Union[Environment, dict, FilePath] = Field( 

38 default_factory=Environment, 

39 title="env", 

40 description="The environment for the agents.", 

41 ) 

42 variable_logging: bool = Field( 

43 default=False, 

44 title="variable_logging", 

45 description="Enable variable logging in all agents with sampling rate of environment.", 

46 ) 

47 _agent_configs: Dict[str, AgentConfig] = PrivateAttr(default={}) 

48 

49 def __init__(self, **data: Any) -> None: 

50 """Add all agents as Agent object""" 

51 super().__init__(**data) 

52 for agent_config in self.agent_configs: 

53 self.add_agent(config=agent_config) 

54 

55 @field_validator("agent_configs") 

56 @classmethod 

57 def setup_agents(cls, agent_configs): 

58 """Load agent configs and add them.""" 

59 cfgs = [] 

60 for cfg in agent_configs: 

61 cfgs.append(load_config(cfg, config_type=AgentConfig)) 

62 return cfgs 

63 

64 def add_agent(self, config: AgentConfig): 

65 """ 

66 Add an agent to the local agency with the 

67 given agent config. 

68 

69 Args: 

70 config Dict: agent config 

71 """ 

72 

73 if self.variable_logging: 

74 if isinstance(self.env, dict): 

75 config = self.add_agent_logger( 

76 config=config, sampling=self.env.get("t_sample", 60) 

77 ) 

78 else: 

79 config = self.add_agent_logger( 

80 config=config, sampling=self.env.config.t_sample 

81 ) 

82 self._agent_configs[config.id] = config.model_copy() 

83 logger.info("Registered agent %s in agency", config.id) 

84 

85 @staticmethod 

86 def add_agent_logger(config: AgentConfig, sampling=60) -> AgentConfig: 

87 """Adds the AgentLogger to the list of configs. 

88 

89 Args: 

90 config dict: The config to be updated 

91 sampling= 

92 """ 

93 # Add Logger config 

94 filename = f"variable_logs//Agent_{config.id}_Logger.log" 

95 cfg = { 

96 "module_id": "AgentLogger", 

97 "type": "AgentLogger", 

98 "t_sample": sampling, 

99 "values_only": True, 

100 "filename": filename, 

101 "overwrite_log": True, 

102 "clean_up": False, 

103 } 

104 config.modules.append(cfg) 

105 return config 

106 

107 @abc.abstractmethod 

108 def run(self, until): 

109 """ 

110 Run the MAS. 

111 Args: 

112 until: The time until which the simulation should run. 

113 

114 Returns: 

115 

116 """ 

117 raise NotImplementedError("'run' is not implemented by the parent class MAS.") 

118 

119 

120class LocalMASAgency(MAS): 

121 """ 

122 Local LocalMASAgency agency class which holds the agents in a common environment, 

123 executes and terminates them. 

124 """ 

125 

126 _agents: Dict[str, Agent] = PrivateAttr(default={}) 

127 

128 @field_validator("env") 

129 @classmethod 

130 def setup_env(cls, env): 

131 """Setup the env if a config is given.""" 

132 if isinstance(env, Environment): 

133 return env 

134 if isinstance(env, (Path, str)): 

135 if Path(env).exists(): 

136 with open(env, "r") as f: 

137 env = json.load(f) 

138 return Environment(config=env) 

139 

140 def add_agent(self, config: AgentConfig): 

141 """Also setup the agent directly""" 

142 super().add_agent(config=config) 

143 self.setup_agent(id=config.id) 

144 

145 def stop_agency(self): 

146 """Stop all threads""" 

147 logger.info("Stopping agency") 

148 self.terminate_agents() 

149 

150 def run(self, until): 

151 """Execute the LocalMASAgency and terminate it after run is finished""" 

152 self.env.run(until=until) 

153 self.stop_agency() 

154 

155 def __enter__(self): 

156 """Enable 'with' statement""" 

157 return self 

158 

159 def __exit__(self, exc_type, exc_val, exc_tb): 

160 """On exit in 'with' statement, stop the agency""" 

161 self.stop_agency() 

162 

163 def terminate_agents(self): 

164 """Terminate all agents modules.""" 

165 logger.info("Terminating all agent modules") 

166 for agent in self._agents.values(): 

167 agent.terminate() 

168 

169 def setup_agent(self, id: str) -> Agent: 

170 """Setup the agent matching the given id""" 

171 # pylint: disable=redefined-builtin 

172 agent = Agent(env=self.env, config=self._agent_configs[id]) 

173 self._agents[agent.id] = agent 

174 return agent 

175 

176 def get_agent(self, id: str) -> Agent: 

177 """Get the agent matching the given id""" 

178 # pylint: disable=redefined-builtin, inconsistent-return-statements 

179 try: 

180 return self._agents[id] 

181 except KeyError: 

182 KeyError(f"Given id '{id}' is not in the set of agents.") 

183 

184 def get_results(self, cleanup: bool = True) -> Dict[str, pd.DataFrame]: 

185 """ 

186 Get all results of the agentLogger 

187 Args: 

188 cleanup: If true, read files are deleted. 

189 

190 Returns: 

191 Dict[str, pd.DataFrame]: key is the agent_id, value the dataframe 

192 """ 

193 results = {} 

194 for agent in self._agents.values(): 

195 new_res = agent.get_results(cleanup=cleanup) 

196 results[agent.id] = new_res 

197 return results 

198 

199 

200class LocalCloneMAPAgency(LocalMASAgency): 

201 """ 

202 Local LocalMASAgency agency class which tries to mimic cloneMAP 

203 behaviour for the local execution. 

204 """ 

205 

206 # todo-fwu delete or add to clonemap example. But I dont think we need the threads, since we have simpy 

207 

208 def run(self, until=None): 

209 pass # Already running 

210 

211 def terminate_agents(self): 

212 """Terminate all agents modules.""" 

213 logger.info("Can't terminate agents yet in this MAS") 

214 

215 def setup_agent(self, id: str): 

216 """Setup the agent matching the given id""" 

217 

218 # pylint: disable=redefined-builtin 

219 def _get_ag(env, ag_config): 

220 ag = Agent(env=Environment(config=env), config=ag_config) 

221 ag.env.run() 

222 return ag 

223 

224 thread = threading.Thread( 

225 target=_get_ag, 

226 kwargs={ 

227 "env": self.env.config.model_copy(), 

228 "ag_config": self._agent_configs[id].copy(), 

229 }, 

230 ) 

231 thread.start() 

232 self._agents[id] = thread 

233 

234 

235def agent_process( 

236 agent_config: Union[dict, FilePath], 

237 until: float, 

238 env: Union[dict, FilePath], 

239 results_dict: dict, 

240 cleanup=True, 

241 log_level=logging.ERROR, 

242): 

243 """ 

244 Function to initialize and start an agent in its own process. 

245 Collects results from the agent and stores them 

246 in the passed results_dict. 

247 Args: 

248 cleanup: 

249 agent_config: Config for an agent. 

250 until: Simulation runtime 

251 env: config for an environment 

252 results_dict: dict from process manager 

253 log_level: the log level for this process 

254 

255 Returns: 

256 

257 """ 

258 logging.basicConfig(level=log_level) 

259 env = Environment(config=env) 

260 agent = Agent(config=agent_config, env=env) 

261 agent.env.run(until=until) 

262 results = agent.get_results(cleanup) 

263 for mod in agent.modules: 

264 mod.terminate() 

265 results_dict[agent.id] = results 

266 

267 

268class MultiProcessingMAS(MAS): 

269 """ 

270 Helper class to conveniently run multi-agent-systems in separate processes. 

271 """ 

272 

273 env: Union[dict, FilePath] = Field( 

274 default_factory=lambda: Environment(config={"rt": True}), 

275 title="env", 

276 description="The environment for the agents.", 

277 ) 

278 cleanup: bool = Field( 

279 default=False, 

280 description="Whether agents should clean the results files after " "running.", 

281 ) 

282 log_level: int = Field( 

283 default=logging.ERROR, description="Loglevel to set for the processes." 

284 ) 

285 

286 _processes: List[multiprocessing.Process] = PrivateAttr(default=[]) 

287 _results_dict: Dict[str, pd.DataFrame] = PrivateAttr(default={}) 

288 

289 @field_validator("env") 

290 @classmethod 

291 def setup_env(cls, env): 

292 """Setup the env if a config is given.""" 

293 if isinstance(env, Environment): 

294 env = env.config.model_dump() 

295 elif isinstance(env, (Path, str)): 

296 if Path(env).exists(): 

297 with open(env, "r") as f: 

298 env = json.load(f) 

299 assert env.setdefault("rt", True), ( 

300 "Synchronization between processes relies on time, RealTimeEnvironment " 

301 "is required." 

302 ) 

303 return env 

304 

305 def __init__(self, **data: Any) -> None: 

306 super().__init__(**data) 

307 manager = multiprocessing.Manager() 

308 self._results_dict = manager.dict() 

309 

310 def run(self, until): 

311 """Execute the multi-agent-system in parallel and terminate it after 

312 run is finished""" 

313 for agent in self._agent_configs.values(): 

314 kwargs = { 

315 "agent_config": agent, 

316 "until": until, 

317 "env": self.env, 

318 "results_dict": self._results_dict, 

319 "cleanup": self.cleanup, 

320 "log_level": self.log_level, 

321 } 

322 process = multiprocessing.Process( 

323 target=agent_process, name=agent.id, kwargs=kwargs 

324 ) 

325 self._processes.append(process) 

326 process.start() 

327 for process in self._processes: 

328 process.join() 

329 

330 def get_results(self) -> Dict[str, pd.DataFrame]: 

331 """ 

332 Get all results of the agentLogger 

333 Returns: 

334 Dict[str, pd.DataFrame]: key is the agent_id, value the dataframe 

335 """ 

336 return dict(self._results_dict)