Coverage for agentlib/utils/multi_agent_system.py: 62%
144 statements
« prev ^ index » next coverage.py v7.4.4, created at 2025-11-07 11:57 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2025-11-07 11:57 +0000
1"""
2Module containing a local agency to test any LocalMASAgency system
3without the need of cloneMAP.
4"""
6import abc
7import json
8import logging
9import multiprocessing
10import threading
11from pathlib import Path
12from typing import List, Dict, Union, Any
14import pandas as pd
15from pydantic import (
16 field_validator,
17 ConfigDict,
18 BaseModel,
19 PrivateAttr,
20 Field,
21 FilePath,
22)
24from agentlib.core import Agent, Environment
25from agentlib.core.agent import AgentConfig
26from agentlib.utils.load_config import load_config
28logger = logging.getLogger(__name__)
31class MAS(BaseModel):
32 """Parent class for all MAS"""
34 model_config = ConfigDict(arbitrary_types_allowed=True)
36 agent_configs: List[Union[dict, FilePath, str]]
37 env: Union[Environment, dict, FilePath] = Field(
38 default_factory=Environment,
39 title="env",
40 description="The environment for the agents.",
41 )
42 variable_logging: bool = Field(
43 default=False,
44 title="variable_logging",
45 description="Enable variable logging in all agents with sampling rate of environment.",
46 )
47 use_direct_callback_databroker: bool = Field(
48 default=False,
49 description="If True, the `DirectCallbackDataBroker` will be used in all agents"
50 )
51 _agent_configs: Dict[str, AgentConfig] = PrivateAttr(default={})
53 def __init__(self, **data: Any) -> None:
54 """Add all agents as Agent object"""
55 super().__init__(**data)
56 for agent_config in self.agent_configs:
57 self.add_agent(config=agent_config)
59 @field_validator("agent_configs")
60 @classmethod
61 def setup_agents(cls, agent_configs):
62 """Load agent configs and add them."""
63 cfgs = []
64 for cfg in agent_configs:
65 cfgs.append(load_config(cfg, config_type=AgentConfig))
66 return cfgs
68 def add_agent(self, config: AgentConfig):
69 """
70 Add an agent to the local agency with the
71 given agent config.
73 Args:
74 config Dict: agent config
75 """
77 if self.variable_logging:
78 if isinstance(self.env, dict):
79 config = self.add_agent_logger(
80 config=config, sampling=self.env.get("t_sample", 60)
81 )
82 else:
83 config = self.add_agent_logger(
84 config=config, sampling=self.env.config.t_sample
85 )
86 if config.use_direct_callback_databroker and not self.use_direct_callback_databroker:
87 logger.warning(
88 "Agent %s explicitly sets use_direct_callback_databroker=True, "
89 "won't apply the MAS.use_direct_callback_databroker=False setting.",
90 config.id
91 )
92 else:
93 config.use_direct_callback_databroker = self.use_direct_callback_databroker
95 self._agent_configs[config.id] = config.model_copy()
96 logger.info("Registered agent %s in agency", config.id)
98 @staticmethod
99 def add_agent_logger(config: AgentConfig, sampling=60) -> AgentConfig:
100 """Adds the AgentLogger to the list of configs.
102 Args:
103 config dict: The config to be updated
104 sampling=
105 """
106 # Add Logger config
107 filename = f"variable_logs//Agent_{config.id}_Logger.log"
108 cfg = {
109 "module_id": "AgentLogger",
110 "type": "AgentLogger",
111 "t_sample": sampling,
112 "values_only": True,
113 "filename": filename,
114 "overwrite_log": True,
115 "clean_up": False,
116 }
117 config.modules.append(cfg)
118 return config
120 @abc.abstractmethod
121 def run(self, until):
122 """
123 Run the MAS.
124 Args:
125 until: The time until which the simulation should run.
127 Returns:
129 """
130 raise NotImplementedError("'run' is not implemented by the parent class MAS.")
133class LocalMASAgency(MAS):
134 """
135 Local LocalMASAgency agency class which holds the agents in a common environment,
136 executes and terminates them.
137 """
139 _agents: Dict[str, Agent] = PrivateAttr(default={})
141 @field_validator("env")
142 @classmethod
143 def setup_env(cls, env):
144 """Setup the env if a config is given."""
145 if isinstance(env, Environment):
146 return env
147 if isinstance(env, (Path, str)):
148 if Path(env).exists():
149 with open(env, "r") as f:
150 env = json.load(f)
151 return Environment(config=env)
153 def add_agent(self, config: AgentConfig):
154 """Also setup the agent directly"""
155 super().add_agent(config=config)
156 self.setup_agent(id=config.id)
158 def stop_agency(self):
159 """Stop all threads"""
160 logger.info("Stopping agency")
161 self.terminate_agents()
163 def run(self, until):
164 """Execute the LocalMASAgency and terminate it after run is finished"""
165 self.env.run(until=until)
166 self.stop_agency()
168 def __enter__(self):
169 """Enable 'with' statement"""
170 return self
172 def __exit__(self, exc_type, exc_val, exc_tb):
173 """On exit in 'with' statement, stop the agency"""
174 self.stop_agency()
176 def terminate_agents(self):
177 """Terminate all agents modules."""
178 logger.info("Terminating all agent modules")
179 for agent in self._agents.values():
180 agent.terminate()
182 def setup_agent(self, id: str) -> Agent:
183 """Setup the agent matching the given id"""
184 # pylint: disable=redefined-builtin
185 agent = Agent(env=self.env, config=self._agent_configs[id])
186 self._agents[agent.id] = agent
187 return agent
189 def get_agent(self, id: str) -> Agent:
190 """Get the agent matching the given id"""
191 # pylint: disable=redefined-builtin, inconsistent-return-statements
192 try:
193 return self._agents[id]
194 except KeyError:
195 KeyError(f"Given id '{id}' is not in the set of agents.")
197 def get_results(self, cleanup: bool = False) -> Dict[str, pd.DataFrame]:
198 """
199 Get all results of the agentLogger
200 Args:
201 cleanup: If true, read files are deleted.
203 Returns:
204 Dict[str, pd.DataFrame]: key is the agent_id, value the dataframe
205 """
206 results = {}
207 for agent in self._agents.values():
208 new_res = agent.get_results(cleanup=cleanup)
209 results[agent.id] = new_res
210 return results
213class LocalCloneMAPAgency(LocalMASAgency):
214 """
215 Local LocalMASAgency agency class which tries to mimic cloneMAP
216 behaviour for the local execution.
217 """
219 # todo-fwu delete or add to clonemap example. But I dont think we need the threads, since we have simpy
221 def run(self, until=None):
222 pass # Already running
224 def terminate_agents(self):
225 """Terminate all agents modules."""
226 logger.info("Can't terminate agents yet in this MAS")
228 def setup_agent(self, id: str):
229 """Setup the agent matching the given id"""
231 # pylint: disable=redefined-builtin
232 def _get_ag(env, ag_config):
233 ag = Agent(env=Environment(config=env), config=ag_config)
234 ag.env.run()
235 return ag
237 thread = threading.Thread(
238 target=_get_ag,
239 kwargs={
240 "env": self.env.config.model_copy(),
241 "ag_config": self._agent_configs[id].copy(),
242 },
243 )
244 thread.start()
245 self._agents[id] = thread
248def agent_process(
249 agent_config: Union[dict, FilePath],
250 until: float,
251 env: Union[dict, FilePath],
252 results_dict: dict,
253 cleanup=True,
254 log_level=logging.ERROR,
255):
256 """
257 Function to initialize and start an agent in its own process.
258 Collects results from the agent and stores them
259 in the passed results_dict.
260 Args:
261 cleanup:
262 agent_config: Config for an agent.
263 until: Simulation runtime
264 env: config for an environment
265 results_dict: dict from process manager
266 log_level: the log level for this process
268 Returns:
270 """
271 logging.basicConfig(level=log_level)
272 env = Environment(config=env)
273 agent = Agent(config=agent_config, env=env)
274 agent.env.run(until=until)
275 results = agent.get_results(cleanup)
276 for mod in agent.modules:
277 mod.terminate()
278 results_dict[agent.id] = results
281class MultiProcessingMAS(MAS):
282 """
283 Helper class to conveniently run multi-agent-systems in separate processes.
284 """
286 env: Union[dict, FilePath] = Field(
287 default_factory=lambda: Environment(config={"rt": True}),
288 title="env",
289 description="The environment for the agents.",
290 )
291 cleanup: bool = Field(
292 default=False,
293 description="Whether agents should clean the results files after " "running.",
294 )
295 log_level: int = Field(
296 default=logging.ERROR, description="Loglevel to set for the processes."
297 )
299 _processes: List[multiprocessing.Process] = PrivateAttr(default=[])
300 _results_dict: Dict[str, pd.DataFrame] = PrivateAttr(default={})
302 @field_validator("env")
303 @classmethod
304 def setup_env(cls, env):
305 """Setup the env if a config is given."""
306 if isinstance(env, Environment):
307 env = env.config.model_dump()
308 elif isinstance(env, (Path, str)):
309 if Path(env).exists():
310 with open(env, "r") as f:
311 env = json.load(f)
312 assert env.setdefault("rt", True), (
313 "Synchronization between processes relies on time, RealTimeEnvironment "
314 "is required."
315 )
316 return env
318 def __init__(self, **data: Any) -> None:
319 super().__init__(**data)
320 manager = multiprocessing.Manager()
321 self._results_dict = manager.dict()
323 def run(self, until):
324 """Execute the multi-agent-system in parallel and terminate it after
325 run is finished"""
326 for agent in self._agent_configs.values():
327 kwargs = {
328 "agent_config": agent,
329 "until": until,
330 "env": self.env,
331 "results_dict": self._results_dict,
332 "cleanup": self.cleanup,
333 "log_level": self.log_level,
334 }
335 process = multiprocessing.Process(
336 target=agent_process, name=agent.id, kwargs=kwargs
337 )
338 self._processes.append(process)
339 process.start()
340 for process in self._processes:
341 process.join()
343 def get_results(self) -> Dict[str, pd.DataFrame]:
344 """
345 Get all results of the agentLogger
346 Returns:
347 Dict[str, pd.DataFrame]: key is the agent_id, value the dataframe
348 """
349 return dict(self._results_dict)