Coverage for agentlib/core/environment.py: 74%

144 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2025-07-21 13:38 +0000

1"""This module contains the Environment class, used by all Agents and Modules. 

2 

3This module contains modified code of simpy (https://gitlab.com/team-simpy/simpy). 

4Simpy is distributed under the MIT License 

5 

6 The MIT License (MIT) 

7 

8 Copyright (c) 2013 Ontje Lünsdorf and Stefan Scherfke (also see AUTHORS.txt) 

9 

10 Permission is hereby granted, free of charge, to any person obtaining a copy of 

11 this software and associated documentation files (the "Software"), to deal in 

12 the Software without restriction, including without limitation the rights to 

13 use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of 

14 the Software, and to permit persons to whom the Software is furnished to do so, 

15 subject to the following conditions: 

16 

17 The above copyright notice and this permission notice shall be included in all 

18 copies or substantial portions of the Software. 

19 

20 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 

21 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS 

22 FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR 

23 COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER 

24 IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN 

25 CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. 

26""" 

27 

28import json 

29import time 

30from datetime import datetime 

31from pathlib import Path 

32from typing import Union, Any, Optional 

33 

34import simpy 

35from pydantic import ( 

36 ConfigDict, 

37 PositiveFloat, 

38 BaseModel, 

39 Field, 

40) 

41from simpy.core import SimTime, Event 

42 

43from agentlib.core import logging_ as agentlib_logging 

44 

45UNTIL_UNSET = object() # sentinel value to check if an until value has been set 

46 

47 

48class EnvironmentConfig(BaseModel): 

49 """Config for the Environment""" 

50 

51 rt: bool = False 

52 factor: PositiveFloat = 1.0 

53 strict: bool = False 

54 t_sample: PositiveFloat = Field( 

55 title="t_sample", 

56 default=60, 

57 description="Used to increase the now-time of" 

58 "the environment using the clock function.", 

59 ) 

60 offset: float = Field( 

61 title="offset", 

62 default=0, 

63 description="Used to offset the now-time of" 

64 "the environment using the clock function.", 

65 ) 

66 clock: bool = False 

67 model_config = ConfigDict( 

68 validate_assignment=True, arbitrary_types_allowed=True, extra="forbid" 

69 ) 

70 

71 

72class Environment: 

73 """Simpy Environment Distributor. Handles synchronous processes.""" 

74 

75 def __new__( 

76 cls, *args, **kwargs 

77 ) -> Union["RealtimeEnvironment", "InstantEnvironment"]: 

78 config = make_env_config(kwargs.get("config")) 

79 if not config.rt: 

80 return InstantEnvironment(config=config) 

81 if config.factor == 1: 

82 return RealtimeEnvironment(config=config) 

83 return ScaledRealtimeEnvironment(config=config) 

84 

85 

86def make_env_config( 

87 config: Union[dict, EnvironmentConfig, str, None], 

88) -> EnvironmentConfig: 

89 """Creates the environment config from different sources.""" 

90 if config is None: 

91 return EnvironmentConfig() 

92 if isinstance(config, EnvironmentConfig): 

93 return config 

94 if isinstance(config, (str, Path)): 

95 if Path(config).exists(): 

96 with open(config, "r") as f: 

97 config = json.load(f) 

98 else: 

99 config = json.loads(config) 

100 return EnvironmentConfig.model_validate(config) 

101 

102 

103class CustomSimpyEnvironment(simpy.Environment): 

104 """A customized version of the simpy environment. Handles execution of modules 

105 processes and manages time for instant execution mode.""" 

106 

107 _config: EnvironmentConfig 

108 

109 @property 

110 def config(self) -> EnvironmentConfig: 

111 """Return the config of the environment""" 

112 return self._config 

113 

114 @property 

115 def offset(self) -> EnvironmentConfig: 

116 """Start time offset of the environment.""" 

117 return self.config.offset 

118 

119 @property 

120 def time(self) -> float: 

121 """Get the current time of the environment.""" 

122 return self.now 

123 

124 def clock(self): 

125 """Define a clock loop to increase the now-timer every other second 

126 (Or whatever t_sample is)""" 

127 while True: 

128 self.logger.info("Current simulation time: %s", self.pretty_time()) 

129 yield self.timeout(self.config.t_sample) 

130 

131 def pretty_time(self): ... 

132 

133 def pretty_until(self) -> Optional[float]: 

134 return None 

135 

136 def calculate_percentage_for_pretty_until(self) -> float: 

137 """Calculate the percentage of completion, accounting for offset.""" 

138 # Account for offset in percentage calculation 

139 simulation_elapsed = self.time - self.config.offset 

140 simulation_total = self._until - self.config.offset 

141 # Avoid division by zero 

142 if simulation_total <= 0: 

143 return 0.0 

144 return round(simulation_elapsed / simulation_total * 100, 1) 

145 

146 

147class InstantEnvironment(CustomSimpyEnvironment): 

148 """A customized version of the simpy environment. Handles execution of modules 

149 processes and manages time for instant execution mode.""" 

150 

151 def __init__(self, *, config: EnvironmentConfig): 

152 super().__init__(initial_time=config.offset) 

153 self._config = config 

154 self._until = UNTIL_UNSET 

155 # Create an environment-specific logger using CustomLogger 

156 self.logger = agentlib_logging.create_logger(env=self, name="environment") 

157 if self.config.clock: 

158 self.process(self.clock()) 

159 

160 def pretty_time(self) -> str: 

161 """Returns the time in seconds.""" 

162 return f"{self.time:.2f}s" 

163 

164 def run(self, until: Optional[Union[SimTime, Event]] = None) -> Optional[Any]: 

165 self._until = until 

166 return super().run(until=until) 

167 

168 def pretty_until(self) -> Union[str, None]: 

169 """Returns the time in seconds.""" 

170 if self._until is None or self._until is UNTIL_UNSET: 

171 return self._until 

172 percent_finished = self.calculate_percentage_for_pretty_until() 

173 return f"{self._until:.2f}s ({percent_finished} %)" 

174 

175 

176class RealtimeEnvironment(simpy.RealtimeEnvironment, CustomSimpyEnvironment): 

177 """A customized version of the simpy environment. Handles execution of modules 

178 processes and manages time for real time execution mode.""" 

179 

180 def __init__(self, *, config: EnvironmentConfig): 

181 super().__init__( 

182 initial_time=config.offset, factor=config.factor, strict=config.strict 

183 ) 

184 self._until = UNTIL_UNSET 

185 self._config = config 

186 self.logger = agentlib_logging.create_logger(env=self, name="environment") 

187 if self.config.clock: 

188 self.process(self.clock()) 

189 else: 

190 self.process(self.silent_clock()) 

191 

192 def run(self, until: Optional[Union[SimTime, Event]] = None) -> Optional[Any]: 

193 self.sync() 

194 self._until = until 

195 return super().run(until=until) 

196 

197 @property 

198 def time(self) -> float: 

199 """Get the current system time as unix timestamp, with the enivronement 

200 offset.""" 

201 return time.time() + self.config.offset 

202 

203 def pretty_time(self) -> str: 

204 """Returns the time in a datetime format.""" 

205 return datetime.fromtimestamp(self.time).strftime("%d-%b-%Y %H:%M:%S") 

206 

207 def silent_clock(self): 

208 """A silent clock, which does not log anything.""" 

209 while True: 

210 yield self.timeout(self.config.t_sample) 

211 

212 

213class ScaledRealtimeEnvironment(simpy.RealtimeEnvironment, CustomSimpyEnvironment): 

214 """A customized version of the simpy environment. Handles execution of modules 

215 processes and manages time for scaled real time execution mode.""" 

216 

217 def __init__(self, *, config: EnvironmentConfig): 

218 super().__init__( 

219 initial_time=config.offset, factor=config.factor, strict=config.strict 

220 ) 

221 self._config = config 

222 self._until = UNTIL_UNSET 

223 self.logger = agentlib_logging.create_logger(env=self, name="environment") 

224 if self.config.clock: 

225 self.process(self.clock()) 

226 else: 

227 self.process(self.silent_clock()) 

228 

229 def run(self, until: Optional[Union[SimTime, Event]] = None) -> Optional[Any]: 

230 self.sync() 

231 self._until = until 

232 return super().run(until=until) 

233 

234 @property 

235 def time(self) -> float: 

236 """Get the current time of the environment.""" 

237 return self.now 

238 

239 def pretty_time(self) -> str: 

240 """Returns the time in seconds.""" 

241 return f"{self.time:.2f}s" 

242 

243 def silent_clock(self): 

244 """A silent clock, which does not log anything.""" 

245 while True: 

246 yield self.timeout(self.config.t_sample) 

247 

248 

249def monkey_patch_simpy_process(): 

250 """Removes the exception catching in simpy processes. This removes some of simpys 

251 features that we do not need. In return, it improves debugging and makes error 

252 messages more concise. 

253 """ 

254 

255 def _describe_frame(frame) -> str: 

256 """Print filename, line number and function name of a stack frame.""" 

257 filename, name = frame.f_code.co_filename, frame.f_code.co_name 

258 lineno = frame.f_lineno 

259 

260 with open(filename) as f: 

261 for no, line in enumerate(f): 

262 if no + 1 == lineno: 

263 return ( 

264 f' File "{filename}", line {lineno}, in {name}\n' 

265 f" {line.strip()}\n" 

266 ) 

267 return f' File "{filename}", line {lineno}, in {name}\n' 

268 

269 def new_resume(self, event: Event) -> None: 

270 """Resumes the execution of the process with the value of *event*. If 

271 the process generator exits, the process itself will get triggered with 

272 the return value or the exception of the generator.""" 

273 # Mark the current process as active. 

274 self.env._active_proc = self 

275 

276 while True: 

277 # Get next event from process 

278 try: 

279 if event._ok: 

280 event = self._generator.send(event._value) 

281 else: 

282 # The process has no choice but to handle the failed event 

283 # (or fail itself). 

284 event._defused = True 

285 

286 # Create an exclusive copy of the exception for this 

287 # process to prevent traceback modifications by other 

288 # processes. 

289 exc = type(event._value)(*event._value.args) 

290 exc.__cause__ = event._value 

291 event = self._generator.throw(exc) 

292 except StopIteration as e: 

293 # Process has terminated. 

294 event = None # type: ignore 

295 self._ok = True 

296 self._value = e.args[0] if len(e.args) else None 

297 self.env.schedule(self) 

298 break 

299 

300 # Process returned another event to wait upon. 

301 try: 

302 # Be optimistic and blindly access the callbacks attribute. 

303 if event.callbacks is not None: 

304 # The event has not yet been triggered. Register callback 

305 # to resume the process if that happens. 

306 event.callbacks.append(self._resume) 

307 break 

308 except AttributeError: 

309 # Our optimism didn't work out, figure out what went wrong and 

310 # inform the user. 

311 if hasattr(event, "callbacks"): 

312 raise 

313 

314 msg = f'Invalid yield value "{event}"' 

315 descr = _describe_frame(self._generator.gi_frame) 

316 raise RuntimeError(f"\n{descr}{msg}") from None 

317 

318 self._target = event 

319 self.env._active_proc = None 

320 

321 simpy.Process._resume = new_resume 

322 

323 

324monkey_patch_simpy_process()