Coverage for agentlib/core/agent.py: 89%

122 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2025-04-07 16:27 +0000

1""" 

2Module containing only the Agent class. 

3""" 

4 

5import json 

6import threading 

7from typing import Union, List, Dict, TypeVar, Optional 

8 

9from pathlib import Path 

10from pydantic import field_validator, BaseModel, FilePath, Field 

11 

12import agentlib 

13import agentlib.core.logging_ as agentlib_logging 

14from agentlib.core import ( 

15 Environment, 

16 LocalDataBroker, 

17 RTDataBroker, 

18 BaseModule, 

19 DataBroker, 

20) 

21from agentlib.core.environment import CustomSimpyEnvironment 

22from agentlib.utils import custom_injection 

23from agentlib.utils.load_config import load_config 

24 

25BaseModuleClass = TypeVar("BaseModuleClass", bound=BaseModule) 

26 

27 

28class AgentConfig(BaseModel): 

29 """ 

30 Class containing settings / config for an Agent. 

31 

32 Contains just two fields, id and modules. 

33 """ 

34 

35 id: Union[str, int] = Field( 

36 title="id", 

37 description="The ID of the Agent, should be unique in " 

38 "the multi-agent-system the agent is living in.", 

39 ) 

40 modules: List[Union[Dict, FilePath]] = None 

41 check_alive_interval: float = Field( 

42 title="check_alive_interval", 

43 default=1, 

44 ge=0, 

45 description="Check every other check_alive_interval second " 

46 "if the threads of the agent are still alive." 

47 "If that's not the case, exit the main thread of the " 

48 "agent. Updating this value at runtime will " 

49 "not work as all processes have already been started.", 

50 ) 

51 max_queue_size: Optional[int] = Field( 

52 default=1000, 

53 ge=-1, 

54 description="Maximal number of waiting items in data-broker queues. " 

55 "Set to -1 for infinity", 

56 ) 

57 

58 @field_validator("modules") 

59 @classmethod 

60 def check_modules(cls, modules: List): 

61 """Validator to ensure all modules are in dict-format.""" 

62 modules_loaded = [] 

63 for module in modules: 

64 if isinstance(module, (str, Path)): 

65 if Path(module).exists(): 

66 with open(module, "r") as f: 

67 module = json.load(f) 

68 else: 

69 module = json.loads(module) 

70 modules_loaded.append(module) 

71 return modules_loaded 

72 

73 

74class Agent: 

75 """ 

76 The base class for all reactive agent implementations. 

77 

78 Args: 

79 config (Union[AgentConfig, FilePath, str, dict]): 

80 A config object to initialize the agents config 

81 env (Environment): The environment the agent is running in 

82 """ 

83 

84 def __init__(self, *, config, env: Environment): 

85 """ 

86 Create instance of Agent 

87 """ 

88 self._modules = {} 

89 self._threads: Dict[str, threading.Thread] = {} 

90 self.env = env 

91 self.is_alive = True 

92 config: AgentConfig = load_config(config, config_type=AgentConfig) 

93 data_broker_logger = agentlib_logging.create_logger( 

94 env=self.env, name=f"{config.id}/DataBroker" 

95 ) 

96 if env.config.rt: 

97 self._data_broker = RTDataBroker( 

98 env=env, logger=data_broker_logger, max_queue_size=config.max_queue_size 

99 ) 

100 self.register_thread(thread=self._data_broker.thread) 

101 else: 

102 self._data_broker = LocalDataBroker( 

103 env=env, logger=data_broker_logger, max_queue_size=config.max_queue_size 

104 ) 

105 # Update modules 

106 self.config = config 

107 # Setup logger 

108 self.logger = agentlib_logging.create_logger(env=self.env, name=self.id) 

109 

110 # Register the thread monitoring if configured 

111 if env.config.rt: 

112 self.env.process(self._monitor_threads()) 

113 

114 @property 

115 def id(self) -> str: 

116 """ 

117 Getter for current agent's id 

118 

119 Returns: 

120 str: current id of agent 

121 """ 

122 return self.config.id 

123 

124 def __repr__(self): 

125 return f"Agent {self.id}" 

126 

127 @property 

128 def config(self) -> AgentConfig: 

129 """ 

130 Get the config (AgentConfig) of the agent 

131 

132 Returns: 

133 AgentConfig: An instance of AgentConfig 

134 """ 

135 return self._config 

136 

137 @config.setter 

138 def config(self, config: Union[AgentConfig, FilePath, str, dict]): 

139 """ 

140 Set the config of the agent. 

141 As relevant info may be updated, all modules 

142 are re-registered. 

143 

144 Args: 

145 config (Union[AgentConfig, FilePath, str, dict]): 

146 Essentially any object which can be parsed by pydantic 

147 """ 

148 # Set the config 

149 

150 self._config = load_config(config, config_type=AgentConfig) 

151 self._register_modules() 

152 

153 @property 

154 def data_broker(self) -> DataBroker: 

155 """ 

156 Get the data_broker of the agent 

157 

158 Returns: 

159 DataBroker: An instance of the DataBroker class 

160 """ 

161 return self._data_broker 

162 

163 @property 

164 def env(self) -> CustomSimpyEnvironment: 

165 """ 

166 Get the environment the agent is in 

167 

168 Returns: 

169 Environment: The environment instance 

170 """ 

171 return self._env 

172 

173 @env.setter 

174 def env(self, env: Environment): 

175 """ 

176 Set the environment of the agent 

177 

178 Args: 

179 env (Environment): The environment instance 

180 """ 

181 self._env = env 

182 

183 @property 

184 def modules(self) -> List[BaseModuleClass]: 

185 """ 

186 Get all modules of agent 

187 

188 Returns: 

189 List[BaseModule]: List of all modules 

190 """ 

191 return list(self._modules.values()) 

192 

193 def get_module(self, module_id: str) -> BaseModuleClass: 

194 """ 

195 Get the module by given module_id. 

196 If no such module exists, None is returned 

197 Args: 

198 module_id (str): Id of the module to return 

199 Returns: 

200 BaseModule: Module with the given name 

201 """ 

202 return self._modules.get(module_id, None) 

203 

204 def register_thread(self, thread: threading.Thread): 

205 """ 

206 Registers the given thread to the dictionary of threads 

207 which need to run in order for the agent 

208 to work. 

209 

210 Args: 

211 thread threading.Thread: 

212 The thread object 

213 """ 

214 name = thread.name 

215 if name in self._threads: 

216 raise KeyError( 

217 f"Given thread with name '{name}' is already a registered thread" 

218 ) 

219 if not thread.daemon: 

220 self.logger.warning( 

221 "'%s' is not a daemon thread. " 

222 "If the agent raises an error, the thread will keep running.", 

223 name, 

224 ) 

225 self._threads[name] = thread 

226 

227 def _monitor_threads(self): 

228 """Process loop to monitor the threads of the agent.""" 

229 while True: 

230 for name, thread in self._threads.items(): 

231 if not thread.is_alive(): 

232 msg = ( 

233 f"The thread {name} is not alive anymore. Exiting agent. " 

234 f"Check errors above for possible reasons" 

235 ) 

236 self.logger.critical(msg) 

237 self.is_alive = False 

238 raise RuntimeError(msg) 

239 yield self.env.timeout(self.config.check_alive_interval) 

240 

241 def _register_modules(self): 

242 """ 

243 Function to register all modules from the 

244 current config. 

245 The module_ids need to be unique inside the 

246 agents config. 

247 The agent object (self) is passed to the modules. 

248 This is the reason the function is not inside the 

249 validator. 

250 """ 

251 updated_modules = [] 

252 for module_config in self.config.modules: 

253 module_cls = get_module_class(module_config=module_config) 

254 _module_id = module_config.get("module_id", module_cls.__name__) 

255 

256 # Insert default module id if it did not exist: 

257 module_config.update({"module_id": _module_id}) 

258 

259 if _module_id in updated_modules: 

260 raise KeyError( 

261 f"Module with module_id '{_module_id}' " 

262 f"exists multiple times inside agent " 

263 f"{self.id}. Use unique names only." 

264 ) 

265 

266 updated_modules.append(_module_id) 

267 

268 if _module_id in self._modules: 

269 # Update the config: 

270 self.get_module(_module_id).config = module_config 

271 else: 

272 # Add the modules to the list of modules 

273 self._modules.update( 

274 {_module_id: module_cls(agent=self, config=module_config)} 

275 ) 

276 

277 def get_results(self, cleanup=True): 

278 """ 

279 Gets the results of this agent. 

280 Args: 

281 cleanup: If true, created files are deleted. 

282 """ 

283 results = {} 

284 for module in self.modules: 

285 result = module.get_results() 

286 if result is not None: 

287 results[module.id] = result 

288 if cleanup: 

289 self.clean_results() 

290 return results 

291 

292 def clean_results(self): 

293 """ 

294 Calls the cleanup_results function of all modules, removing files that 

295 were created by them. 

296 """ 

297 for module in self.modules: 

298 try: 

299 module.cleanup_results() 

300 except BaseException as e: 

301 self.logger.error( 

302 f"Could not cleanup results for the following module: {module.id}. " 

303 f"The reason is the following exception: {e}" 

304 ) 

305 

306 def terminate(self): 

307 """Calls the terminate function of all modules.""" 

308 for module in self.modules: 

309 module.terminate() 

310 

311 

312def get_module_class(module_config): 

313 """ 

314 Return the Module-Class object for the given config. 

315 

316 Args: 

317 module_config (dict): Config of the module to return 

318 Returns: 

319 BaseModule: Module-Class object 

320 """ 

321 _type = module_config.get("type") 

322 

323 if isinstance(_type, str): 

324 # Get the module-class from the agentlib 

325 module_cls = agentlib.modules.get_module_type(module_type=_type.casefold()) 

326 elif isinstance(_type, dict): 

327 # Load module class 

328 module_cls = custom_injection(config=_type) 

329 else: 

330 raise TypeError( 

331 f"Given module type is of type '{type(_type)}' " 

332 f"but should be str or dict." 

333 ) 

334 

335 return module_cls