Source code for agentlib.utils.multi_agent_system

"""
Module containing a local agency to test any LocalMASAgency system
without the need of cloneMAP.
"""

import abc
import json
import logging
import multiprocessing
import threading
from pathlib import Path
from typing import List, Dict, Union, Any

import pandas as pd
from pydantic import (
    field_validator,
    ConfigDict,
    BaseModel,
    PrivateAttr,
    Field,
    FilePath,
)

from agentlib.core import Agent, Environment
from agentlib.core.agent import AgentConfig
from agentlib.utils.load_config import load_config

logger = logging.getLogger(__name__)


[docs]class MAS(BaseModel): """Parent class for all MAS""" model_config = ConfigDict(arbitrary_types_allowed=True) agent_configs: List[Union[dict, FilePath, str]] env: Union[Environment, dict, FilePath] = Field( default_factory=Environment, title="env", description="The environment for the agents.", ) variable_logging: bool = Field( default=False, title="variable_logging", description="Enable variable logging in all agents with sampling rate of environment.", ) _agent_configs: Dict[str, AgentConfig] = PrivateAttr(default={}) def __init__(self, **data: Any) -> None: """Add all agents as Agent object""" super().__init__(**data) for agent_config in self.agent_configs: self.add_agent(config=agent_config)
[docs] @field_validator("agent_configs") @classmethod def setup_agents(cls, agent_configs): """Load agent configs and add them.""" cfgs = [] for cfg in agent_configs: cfgs.append(load_config(cfg, config_type=AgentConfig)) return cfgs
[docs] def add_agent(self, config: AgentConfig): """ Add an agent to the local agency with the given agent config. Args: config Dict: agent config """ if self.variable_logging: if isinstance(self.env, dict): config = self.add_agent_logger( config=config, sampling=self.env.get("t_sample", 60) ) else: config = self.add_agent_logger( config=config, sampling=self.env.config.t_sample ) self._agent_configs[config.id] = config.model_copy() logger.info("Registered agent %s in agency", config.id)
[docs] @staticmethod def add_agent_logger(config: AgentConfig, sampling=60) -> AgentConfig: """Adds the AgentLogger to the list of configs. Args: config dict: The config to be updated sampling= """ # Add Logger config filename = f"variable_logs//Agent_{config.id}_Logger.log" cfg = { "module_id": "AgentLogger", "type": "AgentLogger", "t_sample": sampling, "values_only": True, "filename": filename, "overwrite_log": True, "clean_up": False, } config.modules.append(cfg) return config
[docs] @abc.abstractmethod def run(self, until): """ Run the MAS. Args: until: The time until which the simulation should run. Returns: """ raise NotImplementedError("'run' is not implemented by the parent class MAS.")
[docs]class LocalMASAgency(MAS): """ Local LocalMASAgency agency class which holds the agents in a common environment, executes and terminates them. """ _agents: Dict[str, Agent] = PrivateAttr(default={})
[docs] @field_validator("env") @classmethod def setup_env(cls, env): """Setup the env if a config is given.""" if isinstance(env, Environment): return env if isinstance(env, (Path, str)): if Path(env).exists(): with open(env, "r") as f: env = json.load(f) return Environment(config=env)
[docs] def add_agent(self, config: AgentConfig): """Also setup the agent directly""" super().add_agent(config=config) self.setup_agent(id=config.id)
[docs] def stop_agency(self): """Stop all threads""" logger.info("Stopping agency") self.terminate_agents()
[docs] def run(self, until): """Execute the LocalMASAgency and terminate it after run is finished""" self.env.run(until=until) self.stop_agency()
def __enter__(self): """Enable 'with' statement""" return self def __exit__(self, exc_type, exc_val, exc_tb): """On exit in 'with' statement, stop the agency""" self.stop_agency()
[docs] def terminate_agents(self): """Terminate all agents modules.""" logger.info("Terminating all agent modules") for agent in self._agents.values(): agent.terminate()
[docs] def setup_agent(self, id: str) -> Agent: """Setup the agent matching the given id""" # pylint: disable=redefined-builtin agent = Agent(env=self.env, config=self._agent_configs[id]) self._agents[agent.id] = agent return agent
[docs] def get_agent(self, id: str) -> Agent: """Get the agent matching the given id""" # pylint: disable=redefined-builtin, inconsistent-return-statements try: return self._agents[id] except KeyError: KeyError(f"Given id '{id}' is not in the set of agents.")
[docs] def get_results(self, cleanup: bool = True) -> Dict[str, pd.DataFrame]: """ Get all results of the agentLogger Args: cleanup: If true, read files are deleted. Returns: Dict[str, pd.DataFrame]: key is the agent_id, value the dataframe """ results = {} for agent in self._agents.values(): new_res = agent.get_results(cleanup=cleanup) results[agent.id] = new_res return results
[docs]class LocalCloneMAPAgency(LocalMASAgency): """ Local LocalMASAgency agency class which tries to mimic cloneMAP behaviour for the local execution. """ # todo-fwu delete or add to clonemap example. But I dont think we need the threads, since we have simpy
[docs] def run(self, until=None): pass # Already running
[docs] def terminate_agents(self): """Terminate all agents modules.""" logger.info("Can't terminate agents yet in this MAS")
[docs] def setup_agent(self, id: str): """Setup the agent matching the given id""" # pylint: disable=redefined-builtin def _get_ag(env, ag_config): ag = Agent(env=Environment(config=env), config=ag_config) ag.env.run() return ag thread = threading.Thread( target=_get_ag, kwargs={ "env": self.env.config.model_copy(), "ag_config": self._agent_configs[id].copy(), }, ) thread.start() self._agents[id] = thread
[docs]def agent_process( agent_config: Union[dict, FilePath], until: float, env: Union[dict, FilePath], results_dict: dict, cleanup=True, log_level=logging.ERROR, ): """ Function to initialize and start an agent in its own process. Collects results from the agent and stores them in the passed results_dict. Args: cleanup: agent_config: Config for an agent. until: Simulation runtime env: config for an environment results_dict: dict from process manager log_level: the log level for this process Returns: """ logging.basicConfig(level=log_level) env = Environment(config=env) agent = Agent(config=agent_config, env=env) agent.env.run(until=until) results = agent.get_results(cleanup) for mod in agent.modules: mod.terminate() results_dict[agent.id] = results
[docs]class MultiProcessingMAS(MAS): """ Helper class to conveniently run multi-agent-systems in separate processes. """ env: Union[dict, FilePath] = Field( default_factory=lambda: Environment(config={"rt": True}), title="env", description="The environment for the agents.", ) cleanup: bool = Field( default=False, description="Whether agents should clean the results files after " "running.", ) log_level: int = Field( default=logging.ERROR, description="Loglevel to set for the processes." ) _processes: List[multiprocessing.Process] = PrivateAttr(default=[]) _results_dict: Dict[str, pd.DataFrame] = PrivateAttr(default={})
[docs] @field_validator("env") @classmethod def setup_env(cls, env): """Setup the env if a config is given.""" if isinstance(env, Environment): env = env.config.model_dump() elif isinstance(env, (Path, str)): if Path(env).exists(): with open(env, "r") as f: env = json.load(f) assert env.setdefault("rt", True), ( "Synchronization between processes relies on time, RealTimeEnvironment " "is required." ) return env
def __init__(self, **data: Any) -> None: super().__init__(**data) manager = multiprocessing.Manager() self._results_dict = manager.dict()
[docs] def run(self, until): """Execute the multi-agent-system in parallel and terminate it after run is finished""" for agent in self._agent_configs.values(): kwargs = { "agent_config": agent, "until": until, "env": self.env, "results_dict": self._results_dict, "cleanup": self.cleanup, "log_level": self.log_level, } process = multiprocessing.Process( target=agent_process, name=agent.id, kwargs=kwargs ) self._processes.append(process) process.start() for process in self._processes: process.join()
[docs] def get_results(self) -> Dict[str, pd.DataFrame]: """ Get all results of the agentLogger Returns: Dict[str, pd.DataFrame]: key is the agent_id, value the dataframe """ return dict(self._results_dict)