Coverage for agentlib/utils/multi_agent_system.py: 61%
140 statements
« prev ^ index » next coverage.py v7.4.4, created at 2025-04-07 16:27 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2025-04-07 16:27 +0000
1"""
2Module containing a local agency to test any LocalMASAgency system
3without the need of cloneMAP.
4"""
6import abc
7import json
8import logging
9import multiprocessing
10import threading
11from pathlib import Path
12from typing import List, Dict, Union, Any
14import pandas as pd
15from pydantic import (
16 field_validator,
17 ConfigDict,
18 BaseModel,
19 PrivateAttr,
20 Field,
21 FilePath,
22)
24from agentlib.core import Agent, Environment
25from agentlib.core.agent import AgentConfig
26from agentlib.utils.load_config import load_config
28logger = logging.getLogger(__name__)
31class MAS(BaseModel):
32 """Parent class for all MAS"""
34 model_config = ConfigDict(arbitrary_types_allowed=True)
36 agent_configs: List[Union[dict, FilePath, str]]
37 env: Union[Environment, dict, FilePath] = Field(
38 default_factory=Environment,
39 title="env",
40 description="The environment for the agents.",
41 )
42 variable_logging: bool = Field(
43 default=False,
44 title="variable_logging",
45 description="Enable variable logging in all agents with sampling rate of environment.",
46 )
47 _agent_configs: Dict[str, AgentConfig] = PrivateAttr(default={})
49 def __init__(self, **data: Any) -> None:
50 """Add all agents as Agent object"""
51 super().__init__(**data)
52 for agent_config in self.agent_configs:
53 self.add_agent(config=agent_config)
55 @field_validator("agent_configs")
56 @classmethod
57 def setup_agents(cls, agent_configs):
58 """Load agent configs and add them."""
59 cfgs = []
60 for cfg in agent_configs:
61 cfgs.append(load_config(cfg, config_type=AgentConfig))
62 return cfgs
64 def add_agent(self, config: AgentConfig):
65 """
66 Add an agent to the local agency with the
67 given agent config.
69 Args:
70 config Dict: agent config
71 """
73 if self.variable_logging:
74 if isinstance(self.env, dict):
75 config = self.add_agent_logger(
76 config=config, sampling=self.env.get("t_sample", 60)
77 )
78 else:
79 config = self.add_agent_logger(
80 config=config, sampling=self.env.config.t_sample
81 )
82 self._agent_configs[config.id] = config.model_copy()
83 logger.info("Registered agent %s in agency", config.id)
85 @staticmethod
86 def add_agent_logger(config: AgentConfig, sampling=60) -> AgentConfig:
87 """Adds the AgentLogger to the list of configs.
89 Args:
90 config dict: The config to be updated
91 sampling=
92 """
93 # Add Logger config
94 filename = f"variable_logs//Agent_{config.id}_Logger.log"
95 cfg = {
96 "module_id": "AgentLogger",
97 "type": "AgentLogger",
98 "t_sample": sampling,
99 "values_only": True,
100 "filename": filename,
101 "overwrite_log": True,
102 "clean_up": False,
103 }
104 config.modules.append(cfg)
105 return config
107 @abc.abstractmethod
108 def run(self, until):
109 """
110 Run the MAS.
111 Args:
112 until: The time until which the simulation should run.
114 Returns:
116 """
117 raise NotImplementedError("'run' is not implemented by the parent class MAS.")
120class LocalMASAgency(MAS):
121 """
122 Local LocalMASAgency agency class which holds the agents in a common environment,
123 executes and terminates them.
124 """
126 _agents: Dict[str, Agent] = PrivateAttr(default={})
128 @field_validator("env")
129 @classmethod
130 def setup_env(cls, env):
131 """Setup the env if a config is given."""
132 if isinstance(env, Environment):
133 return env
134 if isinstance(env, (Path, str)):
135 if Path(env).exists():
136 with open(env, "r") as f:
137 env = json.load(f)
138 return Environment(config=env)
140 def add_agent(self, config: AgentConfig):
141 """Also setup the agent directly"""
142 super().add_agent(config=config)
143 self.setup_agent(id=config.id)
145 def stop_agency(self):
146 """Stop all threads"""
147 logger.info("Stopping agency")
148 self.terminate_agents()
150 def run(self, until):
151 """Execute the LocalMASAgency and terminate it after run is finished"""
152 self.env.run(until=until)
153 self.stop_agency()
155 def __enter__(self):
156 """Enable 'with' statement"""
157 return self
159 def __exit__(self, exc_type, exc_val, exc_tb):
160 """On exit in 'with' statement, stop the agency"""
161 self.stop_agency()
163 def terminate_agents(self):
164 """Terminate all agents modules."""
165 logger.info("Terminating all agent modules")
166 for agent in self._agents.values():
167 agent.terminate()
169 def setup_agent(self, id: str) -> Agent:
170 """Setup the agent matching the given id"""
171 # pylint: disable=redefined-builtin
172 agent = Agent(env=self.env, config=self._agent_configs[id])
173 self._agents[agent.id] = agent
174 return agent
176 def get_agent(self, id: str) -> Agent:
177 """Get the agent matching the given id"""
178 # pylint: disable=redefined-builtin, inconsistent-return-statements
179 try:
180 return self._agents[id]
181 except KeyError:
182 KeyError(f"Given id '{id}' is not in the set of agents.")
184 def get_results(self, cleanup: bool = True) -> Dict[str, pd.DataFrame]:
185 """
186 Get all results of the agentLogger
187 Args:
188 cleanup: If true, read files are deleted.
190 Returns:
191 Dict[str, pd.DataFrame]: key is the agent_id, value the dataframe
192 """
193 results = {}
194 for agent in self._agents.values():
195 new_res = agent.get_results(cleanup=cleanup)
196 results[agent.id] = new_res
197 return results
200class LocalCloneMAPAgency(LocalMASAgency):
201 """
202 Local LocalMASAgency agency class which tries to mimic cloneMAP
203 behaviour for the local execution.
204 """
206 # todo-fwu delete or add to clonemap example. But I dont think we need the threads, since we have simpy
208 def run(self, until=None):
209 pass # Already running
211 def terminate_agents(self):
212 """Terminate all agents modules."""
213 logger.info("Can't terminate agents yet in this MAS")
215 def setup_agent(self, id: str):
216 """Setup the agent matching the given id"""
218 # pylint: disable=redefined-builtin
219 def _get_ag(env, ag_config):
220 ag = Agent(env=Environment(config=env), config=ag_config)
221 ag.env.run()
222 return ag
224 thread = threading.Thread(
225 target=_get_ag,
226 kwargs={
227 "env": self.env.config.model_copy(),
228 "ag_config": self._agent_configs[id].copy(),
229 },
230 )
231 thread.start()
232 self._agents[id] = thread
235def agent_process(
236 agent_config: Union[dict, FilePath],
237 until: float,
238 env: Union[dict, FilePath],
239 results_dict: dict,
240 cleanup=True,
241 log_level=logging.ERROR,
242):
243 """
244 Function to initialize and start an agent in its own process.
245 Collects results from the agent and stores them
246 in the passed results_dict.
247 Args:
248 cleanup:
249 agent_config: Config for an agent.
250 until: Simulation runtime
251 env: config for an environment
252 results_dict: dict from process manager
253 log_level: the log level for this process
255 Returns:
257 """
258 logging.basicConfig(level=log_level)
259 env = Environment(config=env)
260 agent = Agent(config=agent_config, env=env)
261 agent.env.run(until=until)
262 results = agent.get_results(cleanup)
263 for mod in agent.modules:
264 mod.terminate()
265 results_dict[agent.id] = results
268class MultiProcessingMAS(MAS):
269 """
270 Helper class to conveniently run multi-agent-systems in separate processes.
271 """
273 env: Union[dict, FilePath] = Field(
274 default_factory=lambda: Environment(config={"rt": True}),
275 title="env",
276 description="The environment for the agents.",
277 )
278 cleanup: bool = Field(
279 default=False,
280 description="Whether agents should clean the results files after " "running.",
281 )
282 log_level: int = Field(
283 default=logging.ERROR, description="Loglevel to set for the processes."
284 )
286 _processes: List[multiprocessing.Process] = PrivateAttr(default=[])
287 _results_dict: Dict[str, pd.DataFrame] = PrivateAttr(default={})
289 @field_validator("env")
290 @classmethod
291 def setup_env(cls, env):
292 """Setup the env if a config is given."""
293 if isinstance(env, Environment):
294 env = env.config.model_dump()
295 elif isinstance(env, (Path, str)):
296 if Path(env).exists():
297 with open(env, "r") as f:
298 env = json.load(f)
299 assert env.setdefault("rt", True), (
300 "Synchronization between processes relies on time, RealTimeEnvironment "
301 "is required."
302 )
303 return env
305 def __init__(self, **data: Any) -> None:
306 super().__init__(**data)
307 manager = multiprocessing.Manager()
308 self._results_dict = manager.dict()
310 def run(self, until):
311 """Execute the multi-agent-system in parallel and terminate it after
312 run is finished"""
313 for agent in self._agent_configs.values():
314 kwargs = {
315 "agent_config": agent,
316 "until": until,
317 "env": self.env,
318 "results_dict": self._results_dict,
319 "cleanup": self.cleanup,
320 "log_level": self.log_level,
321 }
322 process = multiprocessing.Process(
323 target=agent_process, name=agent.id, kwargs=kwargs
324 )
325 self._processes.append(process)
326 process.start()
327 for process in self._processes:
328 process.join()
330 def get_results(self) -> Dict[str, pd.DataFrame]:
331 """
332 Get all results of the agentLogger
333 Returns:
334 Dict[str, pd.DataFrame]: key is the agent_id, value the dataframe
335 """
336 return dict(self._results_dict)