Coverage for agentlib/core/environment.py: 77%

120 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2025-04-07 16:27 +0000

1"""This module contains the Environment class, used by all Agents and Modules. 

2 

3This module contains modified code of simpy (https://gitlab.com/team-simpy/simpy). 

4Simpy is distributed under the MIT License 

5 

6 The MIT License (MIT) 

7 

8 Copyright (c) 2013 Ontje Lünsdorf and Stefan Scherfke (also see AUTHORS.txt) 

9 

10 Permission is hereby granted, free of charge, to any person obtaining a copy of 

11 this software and associated documentation files (the "Software"), to deal in 

12 the Software without restriction, including without limitation the rights to 

13 use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of 

14 the Software, and to permit persons to whom the Software is furnished to do so, 

15 subject to the following conditions: 

16 

17 The above copyright notice and this permission notice shall be included in all 

18 copies or substantial portions of the Software. 

19 

20 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 

21 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS 

22 FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR 

23 COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER 

24 IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN 

25 CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. 

26""" 

27 

28import json 

29import logging 

30import time 

31from datetime import datetime 

32from pathlib import Path 

33from typing import Union, Any, Optional 

34 

35import simpy 

36from pydantic import ( 

37 ConfigDict, 

38 PositiveFloat, 

39 BaseModel, 

40 Field, 

41) 

42from simpy.core import SimTime, Event 

43 

44logger = logging.getLogger(name=__name__) 

45 

46 

47class EnvironmentConfig(BaseModel): 

48 """Config for the Environment""" 

49 

50 rt: bool = False 

51 factor: PositiveFloat = 1.0 

52 strict: bool = False 

53 t_sample: PositiveFloat = Field( 

54 title="t_sample", 

55 default=60, 

56 description="Used to increase the now-time of" 

57 "the environment using the clock function.", 

58 ) 

59 offset: float = Field( 

60 title="offset", 

61 default=0, 

62 description="Used to offset the now-time of" 

63 "the environment using the clock function.", 

64 ) 

65 clock: bool = False 

66 model_config = ConfigDict( 

67 validate_assignment=True, arbitrary_types_allowed=True, extra="forbid" 

68 ) 

69 

70 

71class Environment: 

72 """Simpy Environment Distributor. Handles synchronous processes.""" 

73 

74 def __new__( 

75 cls, *args, **kwargs 

76 ) -> Union["RealtimeEnvironment", "InstantEnvironment"]: 

77 config = make_env_config(kwargs.get("config")) 

78 if not config.rt: 

79 return InstantEnvironment(config=config) 

80 if config.factor == 1: 

81 return RealtimeEnvironment(config=config) 

82 return ScaledRealtimeEnvironment(config=config) 

83 

84 

85def make_env_config( 

86 config: Union[dict, EnvironmentConfig, str, None], 

87) -> EnvironmentConfig: 

88 """Creates the environment config from different sources.""" 

89 if config is None: 

90 return EnvironmentConfig() 

91 if isinstance(config, EnvironmentConfig): 

92 return config 

93 if isinstance(config, (str, Path)): 

94 if Path(config).exists(): 

95 with open(config, "r") as f: 

96 config = json.load(f) 

97 else: 

98 config = json.loads(config) 

99 return EnvironmentConfig.model_validate(config) 

100 

101 

102class CustomSimpyEnvironment(simpy.Environment): 

103 """A customized version of the simpy environment. Handles execution of modules 

104 processes and manages time for instant execution mode.""" 

105 

106 _config: EnvironmentConfig 

107 

108 @property 

109 def config(self) -> EnvironmentConfig: 

110 """Return the config of the environment""" 

111 return self._config 

112 

113 @property 

114 def offset(self) -> EnvironmentConfig: 

115 """Start time offset of the environment.""" 

116 return self.config.offset 

117 

118 @property 

119 def time(self) -> float: 

120 """Get the current time of the environment.""" 

121 return self.now 

122 

123 def clock(self): 

124 """Define a clock loop to increase the now-timer every other second 

125 (Or whatever t_sample is)""" 

126 while True: 

127 logger.info("Current simulation time: %s", self.pretty_time()) 

128 yield self.timeout(self.config.t_sample) 

129 

130 def pretty_time(self): ... 

131 

132 

133class InstantEnvironment(CustomSimpyEnvironment): 

134 """A customized version of the simpy environment. Handles execution of modules 

135 processes and manages time for instant execution mode.""" 

136 

137 def __init__(self, *, config: EnvironmentConfig): 

138 super().__init__(initial_time=config.offset) 

139 self._config = config 

140 if self.config.clock: 

141 self.process(self.clock()) 

142 

143 def pretty_time(self) -> str: 

144 """Returns the time in seconds.""" 

145 return f"{self.time:.2f}s" 

146 

147 

148class RealtimeEnvironment(simpy.RealtimeEnvironment, CustomSimpyEnvironment): 

149 """A customized version of the simpy environment. Handles execution of modules 

150 processes and manages time for real time execution mode.""" 

151 

152 def __init__(self, *, config: EnvironmentConfig): 

153 super().__init__( 

154 initial_time=config.offset, factor=config.factor, strict=config.strict 

155 ) 

156 self._config = config 

157 if self.config.clock: 

158 self.process(self.clock()) 

159 else: 

160 self.process(self.silent_clock()) 

161 

162 def run(self, until: Optional[Union[SimTime, Event]] = None) -> Optional[Any]: 

163 self.sync() 

164 return super().run(until=until) 

165 

166 @property 

167 def time(self) -> float: 

168 """Get the current system time as unix timestamp, with the enivronement 

169 offset.""" 

170 return time.time() + self.config.offset 

171 

172 def pretty_time(self) -> str: 

173 """Returns the time in a datetime format.""" 

174 return datetime.fromtimestamp(self.time).strftime("%d-%b-%Y %H:%M:%S") 

175 

176 def silent_clock(self): 

177 """A silent clock, which does not log anything.""" 

178 while True: 

179 yield self.timeout(self.config.t_sample) 

180 

181 

182class ScaledRealtimeEnvironment(simpy.RealtimeEnvironment, CustomSimpyEnvironment): 

183 """A customized version of the simpy environment. Handles execution of modules 

184 processes and manages time for scaled real time execution mode.""" 

185 

186 def __init__(self, *, config: EnvironmentConfig): 

187 super().__init__( 

188 initial_time=config.offset, factor=config.factor, strict=config.strict 

189 ) 

190 self._config = config 

191 if self.config.clock: 

192 self.process(self.clock()) 

193 else: 

194 self.process(self.silent_clock()) 

195 

196 def run(self, until: Optional[Union[SimTime, Event]] = None) -> Optional[Any]: 

197 self.sync() 

198 return super().run(until=until) 

199 

200 @property 

201 def time(self) -> float: 

202 """Get the current time of the environment.""" 

203 return self.now 

204 

205 def pretty_time(self) -> str: 

206 """Returns the time in seconds.""" 

207 return f"{self.time:.2f}s" 

208 

209 def silent_clock(self): 

210 """A silent clock, which does not log anything.""" 

211 while True: 

212 yield self.timeout(self.config.t_sample) 

213 

214 

215def monkey_patch_simpy_process(): 

216 """Removes the exception catching in simpy processes. This removes some of simpys 

217 features that we do not need. In return, it improves debugging and makes error 

218 messages more concise. 

219 """ 

220 

221 def _describe_frame(frame) -> str: 

222 """Print filename, line number and function name of a stack frame.""" 

223 filename, name = frame.f_code.co_filename, frame.f_code.co_name 

224 lineno = frame.f_lineno 

225 

226 with open(filename) as f: 

227 for no, line in enumerate(f): 

228 if no + 1 == lineno: 

229 return ( 

230 f' File "{filename}", line {lineno}, in {name}\n' 

231 f" {line.strip()}\n" 

232 ) 

233 return f' File "{filename}", line {lineno}, in {name}\n' 

234 

235 def new_resume(self, event: Event) -> None: 

236 """Resumes the execution of the process with the value of *event*. If 

237 the process generator exits, the process itself will get triggered with 

238 the return value or the exception of the generator.""" 

239 # Mark the current process as active. 

240 self.env._active_proc = self 

241 

242 while True: 

243 # Get next event from process 

244 try: 

245 if event._ok: 

246 event = self._generator.send(event._value) 

247 else: 

248 # The process has no choice but to handle the failed event 

249 # (or fail itself). 

250 event._defused = True 

251 

252 # Create an exclusive copy of the exception for this 

253 # process to prevent traceback modifications by other 

254 # processes. 

255 exc = type(event._value)(*event._value.args) 

256 exc.__cause__ = event._value 

257 event = self._generator.throw(exc) 

258 except StopIteration as e: 

259 # Process has terminated. 

260 event = None # type: ignore 

261 self._ok = True 

262 self._value = e.args[0] if len(e.args) else None 

263 self.env.schedule(self) 

264 break 

265 

266 # Process returned another event to wait upon. 

267 try: 

268 # Be optimistic and blindly access the callbacks attribute. 

269 if event.callbacks is not None: 

270 # The event has not yet been triggered. Register callback 

271 # to resume the process if that happens. 

272 event.callbacks.append(self._resume) 

273 break 

274 except AttributeError: 

275 # Our optimism didn't work out, figure out what went wrong and 

276 # inform the user. 

277 if hasattr(event, "callbacks"): 

278 raise 

279 

280 msg = f'Invalid yield value "{event}"' 

281 descr = _describe_frame(self._generator.gi_frame) 

282 raise RuntimeError(f"\n{descr}{msg}") from None 

283 

284 self._target = event 

285 self.env._active_proc = None 

286 

287 simpy.Process._resume = new_resume 

288 

289 

290monkey_patch_simpy_process()