Coverage for agentlib/core/environment.py: 75%

147 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2025-04-30 13:00 +0000

1"""This module contains the Environment class, used by all Agents and Modules. 

2 

3This module contains modified code of simpy (https://gitlab.com/team-simpy/simpy). 

4Simpy is distributed under the MIT License 

5 

6 The MIT License (MIT) 

7 

8 Copyright (c) 2013 Ontje Lünsdorf and Stefan Scherfke (also see AUTHORS.txt) 

9 

10 Permission is hereby granted, free of charge, to any person obtaining a copy of 

11 this software and associated documentation files (the "Software"), to deal in 

12 the Software without restriction, including without limitation the rights to 

13 use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of 

14 the Software, and to permit persons to whom the Software is furnished to do so, 

15 subject to the following conditions: 

16 

17 The above copyright notice and this permission notice shall be included in all 

18 copies or substantial portions of the Software. 

19 

20 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 

21 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS 

22 FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR 

23 COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER 

24 IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN 

25 CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. 

26""" 

27 

28import json 

29import time 

30from datetime import datetime 

31from pathlib import Path 

32from typing import Union, Any, Optional 

33 

34import simpy 

35from pydantic import ( 

36 ConfigDict, 

37 PositiveFloat, 

38 BaseModel, 

39 Field, 

40) 

41from simpy.core import SimTime, Event 

42 

43from agentlib.core import logging_ as agentlib_logging 

44 

45UNTIL_UNSET = object() # sentinel value to check if an until value has been set 

46 

47 

48class EnvironmentConfig(BaseModel): 

49 """Config for the Environment""" 

50 

51 rt: bool = False 

52 factor: PositiveFloat = 1.0 

53 strict: bool = False 

54 t_sample: PositiveFloat = Field( 

55 title="t_sample", 

56 default=60, 

57 description="Used to increase the now-time of" 

58 "the environment using the clock function.", 

59 ) 

60 offset: float = Field( 

61 title="offset", 

62 default=0, 

63 description="Used to offset the now-time of" 

64 "the environment using the clock function.", 

65 ) 

66 clock: bool = False 

67 model_config = ConfigDict( 

68 validate_assignment=True, arbitrary_types_allowed=True, extra="forbid" 

69 ) 

70 

71 

72class Environment: 

73 """Simpy Environment Distributor. Handles synchronous processes.""" 

74 

75 def __new__( 

76 cls, *args, **kwargs 

77 ) -> Union["RealtimeEnvironment", "InstantEnvironment"]: 

78 config = make_env_config(kwargs.get("config")) 

79 if not config.rt: 

80 return InstantEnvironment(config=config) 

81 if config.factor == 1: 

82 return RealtimeEnvironment(config=config) 

83 return ScaledRealtimeEnvironment(config=config) 

84 

85 

86def make_env_config( 

87 config: Union[dict, EnvironmentConfig, str, None], 

88) -> EnvironmentConfig: 

89 """Creates the environment config from different sources.""" 

90 if config is None: 

91 return EnvironmentConfig() 

92 if isinstance(config, EnvironmentConfig): 

93 return config 

94 if isinstance(config, (str, Path)): 

95 if Path(config).exists(): 

96 with open(config, "r") as f: 

97 config = json.load(f) 

98 else: 

99 config = json.loads(config) 

100 return EnvironmentConfig.model_validate(config) 

101 

102 

103class CustomSimpyEnvironment(simpy.Environment): 

104 """A customized version of the simpy environment. Handles execution of modules 

105 processes and manages time for instant execution mode.""" 

106 

107 _config: EnvironmentConfig 

108 

109 @property 

110 def config(self) -> EnvironmentConfig: 

111 """Return the config of the environment""" 

112 return self._config 

113 

114 @property 

115 def offset(self) -> EnvironmentConfig: 

116 """Start time offset of the environment.""" 

117 return self.config.offset 

118 

119 @property 

120 def time(self) -> float: 

121 """Get the current time of the environment.""" 

122 return self.now 

123 

124 def clock(self): 

125 """Define a clock loop to increase the now-timer every other second 

126 (Or whatever t_sample is)""" 

127 while True: 

128 self.logger.info("Current simulation time: %s", self.pretty_time()) 

129 yield self.timeout(self.config.t_sample) 

130 

131 def pretty_time(self): ... 

132 

133 def pretty_until(self): ... 

134 

135 

136class InstantEnvironment(CustomSimpyEnvironment): 

137 """A customized version of the simpy environment. Handles execution of modules 

138 processes and manages time for instant execution mode.""" 

139 

140 def __init__(self, *, config: EnvironmentConfig): 

141 super().__init__(initial_time=config.offset) 

142 self._config = config 

143 self._until = UNTIL_UNSET 

144 # Create an environment-specific logger using CustomLogger 

145 self.logger = agentlib_logging.create_logger(env=self, name="environment") 

146 if self.config.clock: 

147 self.process(self.clock()) 

148 

149 def pretty_time(self) -> str: 

150 """Returns the time in seconds.""" 

151 return f"{self.time:.2f}s" 

152 

153 def run(self, until: Optional[Union[SimTime, Event]] = None) -> Optional[Any]: 

154 self._until = until 

155 return super().run(until=until) 

156 

157 def pretty_until(self) -> Union[str, None]: 

158 """Returns the time in seconds.""" 

159 if self._until is None or self._until is UNTIL_UNSET: 

160 return self._until 

161 _percent_finished = round(self.time / self._until * 100, 1) 

162 return f"{self._until:.2f}s" + f" ({_percent_finished} %)" 

163 

164 

165class RealtimeEnvironment(simpy.RealtimeEnvironment, CustomSimpyEnvironment): 

166 """A customized version of the simpy environment. Handles execution of modules 

167 processes and manages time for real time execution mode.""" 

168 

169 def __init__(self, *, config: EnvironmentConfig): 

170 super().__init__( 

171 initial_time=config.offset, factor=config.factor, strict=config.strict 

172 ) 

173 self._until = UNTIL_UNSET 

174 self._config = config 

175 self.logger = agentlib_logging.create_logger(env=self, name="environment") 

176 if self.config.clock: 

177 self.process(self.clock()) 

178 else: 

179 self.process(self.silent_clock()) 

180 

181 def run(self, until: Optional[Union[SimTime, Event]] = None) -> Optional[Any]: 

182 self.sync() 

183 self._until = until 

184 return super().run(until=until) 

185 

186 @property 

187 def time(self) -> float: 

188 """Get the current system time as unix timestamp, with the enivronement 

189 offset.""" 

190 return time.time() + self.config.offset 

191 

192 def pretty_time(self) -> str: 

193 """Returns the time in a datetime format.""" 

194 return datetime.fromtimestamp(self.time).strftime("%d-%b-%Y %H:%M:%S") 

195 

196 def pretty_until(self) -> Union[str, None]: 

197 """Returns the time in a datetime format.""" 

198 if self._until is None or self._until is UNTIL_UNSET: 

199 return self._until 

200 _percent_finished = round(self.time / self._until * 100, 1) 

201 return ( 

202 datetime.fromtimestamp(self._until).strftime("%d-%b-%Y %H:%M:%S") 

203 + f" ({_percent_finished} %)" 

204 ) 

205 

206 def silent_clock(self): 

207 """A silent clock, which does not log anything.""" 

208 while True: 

209 yield self.timeout(self.config.t_sample) 

210 

211 

212class ScaledRealtimeEnvironment(simpy.RealtimeEnvironment, CustomSimpyEnvironment): 

213 """A customized version of the simpy environment. Handles execution of modules 

214 processes and manages time for scaled real time execution mode.""" 

215 

216 def __init__(self, *, config: EnvironmentConfig): 

217 super().__init__( 

218 initial_time=config.offset, factor=config.factor, strict=config.strict 

219 ) 

220 self._config = config 

221 self._until = UNTIL_UNSET 

222 self.logger = agentlib_logging.create_logger(env=self, name="environment") 

223 if self.config.clock: 

224 self.process(self.clock()) 

225 else: 

226 self.process(self.silent_clock()) 

227 

228 def run(self, until: Optional[Union[SimTime, Event]] = None) -> Optional[Any]: 

229 self.sync() 

230 self._until = until 

231 return super().run(until=until) 

232 

233 @property 

234 def time(self) -> float: 

235 """Get the current time of the environment.""" 

236 return self.now 

237 

238 def pretty_time(self) -> str: 

239 """Returns the time in seconds.""" 

240 return f"{self.time:.2f}s" 

241 

242 def pretty_until(self) -> Union[str, None]: 

243 """Returns the time in seconds.""" 

244 if self._until is None or self._until is UNTIL_UNSET: 

245 return self._until 

246 _percent_finished = round(self.time / self._until * 100, 1) 

247 return f"{self._until:.2f}s" + f" ({_percent_finished} %)" 

248 

249 def silent_clock(self): 

250 """A silent clock, which does not log anything.""" 

251 while True: 

252 yield self.timeout(self.config.t_sample) 

253 

254 

255def monkey_patch_simpy_process(): 

256 """Removes the exception catching in simpy processes. This removes some of simpys 

257 features that we do not need. In return, it improves debugging and makes error 

258 messages more concise. 

259 """ 

260 

261 def _describe_frame(frame) -> str: 

262 """Print filename, line number and function name of a stack frame.""" 

263 filename, name = frame.f_code.co_filename, frame.f_code.co_name 

264 lineno = frame.f_lineno 

265 

266 with open(filename) as f: 

267 for no, line in enumerate(f): 

268 if no + 1 == lineno: 

269 return ( 

270 f' File "{filename}", line {lineno}, in {name}\n' 

271 f" {line.strip()}\n" 

272 ) 

273 return f' File "{filename}", line {lineno}, in {name}\n' 

274 

275 def new_resume(self, event: Event) -> None: 

276 """Resumes the execution of the process with the value of *event*. If 

277 the process generator exits, the process itself will get triggered with 

278 the return value or the exception of the generator.""" 

279 # Mark the current process as active. 

280 self.env._active_proc = self 

281 

282 while True: 

283 # Get next event from process 

284 try: 

285 if event._ok: 

286 event = self._generator.send(event._value) 

287 else: 

288 # The process has no choice but to handle the failed event 

289 # (or fail itself). 

290 event._defused = True 

291 

292 # Create an exclusive copy of the exception for this 

293 # process to prevent traceback modifications by other 

294 # processes. 

295 exc = type(event._value)(*event._value.args) 

296 exc.__cause__ = event._value 

297 event = self._generator.throw(exc) 

298 except StopIteration as e: 

299 # Process has terminated. 

300 event = None # type: ignore 

301 self._ok = True 

302 self._value = e.args[0] if len(e.args) else None 

303 self.env.schedule(self) 

304 break 

305 

306 # Process returned another event to wait upon. 

307 try: 

308 # Be optimistic and blindly access the callbacks attribute. 

309 if event.callbacks is not None: 

310 # The event has not yet been triggered. Register callback 

311 # to resume the process if that happens. 

312 event.callbacks.append(self._resume) 

313 break 

314 except AttributeError: 

315 # Our optimism didn't work out, figure out what went wrong and 

316 # inform the user. 

317 if hasattr(event, "callbacks"): 

318 raise 

319 

320 msg = f'Invalid yield value "{event}"' 

321 descr = _describe_frame(self._generator.gi_frame) 

322 raise RuntimeError(f"\n{descr}{msg}") from None 

323 

324 self._target = event 

325 self.env._active_proc = None 

326 

327 simpy.Process._resume = new_resume 

328 

329 

330monkey_patch_simpy_process()