Source code for agentlib.core.agent

"""
Module containing only the Agent class.
"""

import json
import threading
from copy import deepcopy
from pathlib import Path
from typing import Union, List, Dict, TypeVar, Optional

from pydantic import field_validator, BaseModel, FilePath, Field
from pydantic_core.core_schema import FieldValidationInfo

import agentlib
import agentlib.core.logging_ as agentlib_logging
from agentlib.core import (
    Environment,
    LocalDataBroker,
    RTDataBroker,
    DirectCallbackDataBroker,
    BaseModule,
    DataBroker,
)
from agentlib.core.environment import CustomSimpyEnvironment
from agentlib.core.errors import ConfigurationError
from agentlib.utils import custom_injection
from agentlib.utils.load_config import load_config

BaseModuleClass = TypeVar("BaseModuleClass", bound=BaseModule)


[docs]class AgentConfig(BaseModel): """ Class containing settings / config for an Agent. Contains just two fields, id and modules. """ id: Union[str, int] = Field( title="id", description="The ID of the Agent, should be unique in " "the multi-agent-system the agent is living in.", ) modules: Union[List[Union[Dict, FilePath]], Dict[str, Union[Dict, FilePath]]] = ( Field( default_factory=list, description="A list or dictionary of modules. If a dictionary is provided, keys are treated as module_ids.", ) ) check_alive_interval: float = Field( title="check_alive_interval", default=1, ge=0, description="Check every other check_alive_interval second " "if the threads of the agent are still alive." "If that's not the case, exit the main thread of the " "agent. Updating this value at runtime will " "not work as all processes have already been started.", ) max_queue_size: Optional[int] = Field( default=1000, ge=-1, description="Maximal number of waiting items in data-broker queues. " "Set to -1 for infinity", ) use_direct_callback_databroker: bool = Field( default=False, description="If True, the `DirectCallbackDataBroker` will be used " )
[docs] @field_validator("modules") @classmethod def check_modules(cls, modules: Union[List, Dict], info: FieldValidationInfo): """Validator to ensure all modules are in dict-format and include 'module_id'.""" modules_loaded = [] if isinstance(modules, dict): for module_id, module in modules.items(): if isinstance(module, (str, Path)): if Path(module).exists(): with open(module, "r") as f: module = json.load(f) else: module = json.loads(module) if isinstance(module, dict): module = deepcopy(module) if "module_id" in module and not module["module_id"] == module_id: agent = info.data["id"] raise ConfigurationError( f"Provided agent {agent} has ambiguous module_id. Module " f"config was declared with dict key {module_id} but " f"contains different module_id {module['module_id']} " f"within config." ) module["module_id"] = module_id modules_loaded.append(module) elif isinstance(modules, list): for module in modules: if isinstance(module, (str, Path)): if Path(module).exists(): with open(module, "r") as f: module = json.load(f) else: module = json.loads(module) modules_loaded.append(module) else: raise TypeError("Modules must be a list or a dict") return modules_loaded
[docs]class Agent: """ The base class for all reactive agent implementations. Args: config (Union[AgentConfig, FilePath, str, dict]): A config object to initialize the agents config env (Environment): The environment the agent is running in """ def __init__(self, *, config, env: Environment): """ Create instance of Agent """ self._modules = {} self._threads: Dict[str, threading.Thread] = {} self.env = env self.is_alive = True config: AgentConfig = load_config(config, config_type=AgentConfig) data_broker_logger = agentlib_logging.create_logger( env=self.env, name=f"{config.id}/DataBroker" ) if env.config.rt: if config.use_direct_callback_databroker: raise ValueError("Can not use the direct callback databroker in real-time") self._data_broker = RTDataBroker( env=env, logger=data_broker_logger, max_queue_size=config.max_queue_size ) self.register_thread(thread=self._data_broker.thread) elif config.use_direct_callback_databroker: self._data_broker = DirectCallbackDataBroker( logger=data_broker_logger ) else: self._data_broker = LocalDataBroker( env=env, logger=data_broker_logger, max_queue_size=config.max_queue_size ) # Update modules self.config = config # Setup logger self.logger = agentlib_logging.create_logger(env=self.env, name=self.id) # Register the thread monitoring if configured if env.config.rt: self.env.process(self._monitor_threads()) @property def id(self) -> str: """ Getter for current agent's id Returns: str: current id of agent """ return self.config.id def __repr__(self): return f"Agent {self.id}" @property def config(self) -> AgentConfig: """ Get the config (AgentConfig) of the agent Returns: AgentConfig: An instance of AgentConfig """ return self._config @config.setter def config(self, config: Union[AgentConfig, FilePath, str, dict]): """ Set the config of the agent. As relevant info may be updated, all modules are re-registered. Args: config (Union[AgentConfig, FilePath, str, dict]): Essentially any object which can be parsed by pydantic """ # Set the config self._config = load_config(config, config_type=AgentConfig) self._register_modules() @property def data_broker(self) -> DataBroker: """ Get the data_broker of the agent Returns: DataBroker: An instance of the DataBroker class """ return self._data_broker @property def env(self) -> CustomSimpyEnvironment: """ Get the environment the agent is in Returns: Environment: The environment instance """ return self._env @env.setter def env(self, env: Environment): """ Set the environment of the agent Args: env (Environment): The environment instance """ self._env = env @property def modules(self) -> List[BaseModuleClass]: """ Get all modules of agent Returns: List[BaseModule]: List of all modules """ return list(self._modules.values())
[docs] def get_module(self, module_id: str) -> BaseModuleClass: """ Get the module by given module_id. If no such module exists, None is returned Args: module_id (str): Id of the module to return Returns: BaseModule: Module with the given name """ return self._modules.get(module_id, None)
[docs] def register_thread(self, thread: threading.Thread): """ Registers the given thread to the dictionary of threads which need to run in order for the agent to work. Args: thread threading.Thread: The thread object """ name = thread.name if name in self._threads: raise KeyError( f"Given thread with name '{name}' is already a registered thread" ) if not thread.daemon: self.logger.warning( "'%s' is not a daemon thread. " "If the agent raises an error, the thread will keep running.", name, ) self._threads[name] = thread
def _monitor_threads(self): """Process loop to monitor the threads of the agent.""" while True: for name, thread in self._threads.items(): if not thread.is_alive(): msg = ( f"The thread {name} is not alive anymore. Exiting agent. " f"Check errors above for possible reasons" ) self.logger.critical(msg) self.is_alive = False raise RuntimeError(msg) yield self.env.timeout(self.config.check_alive_interval) def _register_modules(self): """ Function to register all modules from the current config. The module_ids need to be unique inside the agents config. The agent object (self) is passed to the modules. This is the reason the function is not inside the validator. """ updated_modules = [] for module_config in self.config.modules: module_cls = get_module_class(module_config=module_config) _module_id = module_config.get("module_id", module_cls.__name__) # Insert default module id if it did not exist: module_config.update({"module_id": _module_id}) if _module_id in updated_modules: raise KeyError( f"Module with module_id '{_module_id}' " f"exists multiple times inside agent " f"{self.id}. Use unique names only." ) updated_modules.append(_module_id) if _module_id in self._modules: # Update the config: self.get_module(_module_id).config = module_config else: # Add the modules to the list of modules self._modules.update( {_module_id: module_cls(agent=self, config=module_config)} )
[docs] def get_results(self, cleanup=False): """ Gets the results of this agent. Args: cleanup: If true, created files are deleted. """ results = {} for module in self.modules: try: result = module.get_results() except BaseException as e: self.logger.error(f"Error reading results of module {module.id}: {e}") result = None if result is not None: results[module.id] = result if cleanup: self.clean_results() return results
[docs] def clean_results(self): """ Calls the cleanup_results function of all modules, removing files that were created by them. """ for module in self.modules: try: module.cleanup_results() except BaseException as e: self.logger.error( f"Could not cleanup results for the following module: {module.id}. " f"The reason is the following exception: {e}" )
[docs] def terminate(self): """Calls the terminate function of all modules.""" for module in self.modules: module.terminate()
[docs]def get_module_class(module_config): """ Return the Module-Class object for the given config. Args: module_config (dict): Config of the module to return Returns: BaseModule: Module-Class object """ _type = module_config.get("type") if isinstance(_type, str): # Get the module-class from the agentlib module_cls = agentlib.modules.get_module_type(module_type=_type.casefold()) elif isinstance(_type, dict): # Load module class module_cls = custom_injection(config=_type) else: raise TypeError( f"Given module type is of type '{type(_type)}' " f"but should be str or dict." ) return module_cls