Coverage for agentlib/core/agent.py: 89%
122 statements
« prev ^ index » next coverage.py v7.4.4, created at 2025-04-07 16:27 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2025-04-07 16:27 +0000
1"""
2Module containing only the Agent class.
3"""
5import json
6import threading
7from typing import Union, List, Dict, TypeVar, Optional
9from pathlib import Path
10from pydantic import field_validator, BaseModel, FilePath, Field
12import agentlib
13import agentlib.core.logging_ as agentlib_logging
14from agentlib.core import (
15 Environment,
16 LocalDataBroker,
17 RTDataBroker,
18 BaseModule,
19 DataBroker,
20)
21from agentlib.core.environment import CustomSimpyEnvironment
22from agentlib.utils import custom_injection
23from agentlib.utils.load_config import load_config
25BaseModuleClass = TypeVar("BaseModuleClass", bound=BaseModule)
28class AgentConfig(BaseModel):
29 """
30 Class containing settings / config for an Agent.
32 Contains just two fields, id and modules.
33 """
35 id: Union[str, int] = Field(
36 title="id",
37 description="The ID of the Agent, should be unique in "
38 "the multi-agent-system the agent is living in.",
39 )
40 modules: List[Union[Dict, FilePath]] = None
41 check_alive_interval: float = Field(
42 title="check_alive_interval",
43 default=1,
44 ge=0,
45 description="Check every other check_alive_interval second "
46 "if the threads of the agent are still alive."
47 "If that's not the case, exit the main thread of the "
48 "agent. Updating this value at runtime will "
49 "not work as all processes have already been started.",
50 )
51 max_queue_size: Optional[int] = Field(
52 default=1000,
53 ge=-1,
54 description="Maximal number of waiting items in data-broker queues. "
55 "Set to -1 for infinity",
56 )
58 @field_validator("modules")
59 @classmethod
60 def check_modules(cls, modules: List):
61 """Validator to ensure all modules are in dict-format."""
62 modules_loaded = []
63 for module in modules:
64 if isinstance(module, (str, Path)):
65 if Path(module).exists():
66 with open(module, "r") as f:
67 module = json.load(f)
68 else:
69 module = json.loads(module)
70 modules_loaded.append(module)
71 return modules_loaded
74class Agent:
75 """
76 The base class for all reactive agent implementations.
78 Args:
79 config (Union[AgentConfig, FilePath, str, dict]):
80 A config object to initialize the agents config
81 env (Environment): The environment the agent is running in
82 """
84 def __init__(self, *, config, env: Environment):
85 """
86 Create instance of Agent
87 """
88 self._modules = {}
89 self._threads: Dict[str, threading.Thread] = {}
90 self.env = env
91 self.is_alive = True
92 config: AgentConfig = load_config(config, config_type=AgentConfig)
93 data_broker_logger = agentlib_logging.create_logger(
94 env=self.env, name=f"{config.id}/DataBroker"
95 )
96 if env.config.rt:
97 self._data_broker = RTDataBroker(
98 env=env, logger=data_broker_logger, max_queue_size=config.max_queue_size
99 )
100 self.register_thread(thread=self._data_broker.thread)
101 else:
102 self._data_broker = LocalDataBroker(
103 env=env, logger=data_broker_logger, max_queue_size=config.max_queue_size
104 )
105 # Update modules
106 self.config = config
107 # Setup logger
108 self.logger = agentlib_logging.create_logger(env=self.env, name=self.id)
110 # Register the thread monitoring if configured
111 if env.config.rt:
112 self.env.process(self._monitor_threads())
114 @property
115 def id(self) -> str:
116 """
117 Getter for current agent's id
119 Returns:
120 str: current id of agent
121 """
122 return self.config.id
124 def __repr__(self):
125 return f"Agent {self.id}"
127 @property
128 def config(self) -> AgentConfig:
129 """
130 Get the config (AgentConfig) of the agent
132 Returns:
133 AgentConfig: An instance of AgentConfig
134 """
135 return self._config
137 @config.setter
138 def config(self, config: Union[AgentConfig, FilePath, str, dict]):
139 """
140 Set the config of the agent.
141 As relevant info may be updated, all modules
142 are re-registered.
144 Args:
145 config (Union[AgentConfig, FilePath, str, dict]):
146 Essentially any object which can be parsed by pydantic
147 """
148 # Set the config
150 self._config = load_config(config, config_type=AgentConfig)
151 self._register_modules()
153 @property
154 def data_broker(self) -> DataBroker:
155 """
156 Get the data_broker of the agent
158 Returns:
159 DataBroker: An instance of the DataBroker class
160 """
161 return self._data_broker
163 @property
164 def env(self) -> CustomSimpyEnvironment:
165 """
166 Get the environment the agent is in
168 Returns:
169 Environment: The environment instance
170 """
171 return self._env
173 @env.setter
174 def env(self, env: Environment):
175 """
176 Set the environment of the agent
178 Args:
179 env (Environment): The environment instance
180 """
181 self._env = env
183 @property
184 def modules(self) -> List[BaseModuleClass]:
185 """
186 Get all modules of agent
188 Returns:
189 List[BaseModule]: List of all modules
190 """
191 return list(self._modules.values())
193 def get_module(self, module_id: str) -> BaseModuleClass:
194 """
195 Get the module by given module_id.
196 If no such module exists, None is returned
197 Args:
198 module_id (str): Id of the module to return
199 Returns:
200 BaseModule: Module with the given name
201 """
202 return self._modules.get(module_id, None)
204 def register_thread(self, thread: threading.Thread):
205 """
206 Registers the given thread to the dictionary of threads
207 which need to run in order for the agent
208 to work.
210 Args:
211 thread threading.Thread:
212 The thread object
213 """
214 name = thread.name
215 if name in self._threads:
216 raise KeyError(
217 f"Given thread with name '{name}' is already a registered thread"
218 )
219 if not thread.daemon:
220 self.logger.warning(
221 "'%s' is not a daemon thread. "
222 "If the agent raises an error, the thread will keep running.",
223 name,
224 )
225 self._threads[name] = thread
227 def _monitor_threads(self):
228 """Process loop to monitor the threads of the agent."""
229 while True:
230 for name, thread in self._threads.items():
231 if not thread.is_alive():
232 msg = (
233 f"The thread {name} is not alive anymore. Exiting agent. "
234 f"Check errors above for possible reasons"
235 )
236 self.logger.critical(msg)
237 self.is_alive = False
238 raise RuntimeError(msg)
239 yield self.env.timeout(self.config.check_alive_interval)
241 def _register_modules(self):
242 """
243 Function to register all modules from the
244 current config.
245 The module_ids need to be unique inside the
246 agents config.
247 The agent object (self) is passed to the modules.
248 This is the reason the function is not inside the
249 validator.
250 """
251 updated_modules = []
252 for module_config in self.config.modules:
253 module_cls = get_module_class(module_config=module_config)
254 _module_id = module_config.get("module_id", module_cls.__name__)
256 # Insert default module id if it did not exist:
257 module_config.update({"module_id": _module_id})
259 if _module_id in updated_modules:
260 raise KeyError(
261 f"Module with module_id '{_module_id}' "
262 f"exists multiple times inside agent "
263 f"{self.id}. Use unique names only."
264 )
266 updated_modules.append(_module_id)
268 if _module_id in self._modules:
269 # Update the config:
270 self.get_module(_module_id).config = module_config
271 else:
272 # Add the modules to the list of modules
273 self._modules.update(
274 {_module_id: module_cls(agent=self, config=module_config)}
275 )
277 def get_results(self, cleanup=True):
278 """
279 Gets the results of this agent.
280 Args:
281 cleanup: If true, created files are deleted.
282 """
283 results = {}
284 for module in self.modules:
285 result = module.get_results()
286 if result is not None:
287 results[module.id] = result
288 if cleanup:
289 self.clean_results()
290 return results
292 def clean_results(self):
293 """
294 Calls the cleanup_results function of all modules, removing files that
295 were created by them.
296 """
297 for module in self.modules:
298 try:
299 module.cleanup_results()
300 except BaseException as e:
301 self.logger.error(
302 f"Could not cleanup results for the following module: {module.id}. "
303 f"The reason is the following exception: {e}"
304 )
306 def terminate(self):
307 """Calls the terminate function of all modules."""
308 for module in self.modules:
309 module.terminate()
312def get_module_class(module_config):
313 """
314 Return the Module-Class object for the given config.
316 Args:
317 module_config (dict): Config of the module to return
318 Returns:
319 BaseModule: Module-Class object
320 """
321 _type = module_config.get("type")
323 if isinstance(_type, str):
324 # Get the module-class from the agentlib
325 module_cls = agentlib.modules.get_module_type(module_type=_type.casefold())
326 elif isinstance(_type, dict):
327 # Load module class
328 module_cls = custom_injection(config=_type)
329 else:
330 raise TypeError(
331 f"Given module type is of type '{type(_type)}' "
332 f"but should be str or dict."
333 )
335 return module_cls