Coverage for agentlib/core/environment.py: 75%
147 statements
« prev ^ index » next coverage.py v7.4.4, created at 2025-04-30 13:00 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2025-04-30 13:00 +0000
1"""This module contains the Environment class, used by all Agents and Modules.
3This module contains modified code of simpy (https://gitlab.com/team-simpy/simpy).
4Simpy is distributed under the MIT License
6 The MIT License (MIT)
8 Copyright (c) 2013 Ontje Lünsdorf and Stefan Scherfke (also see AUTHORS.txt)
10 Permission is hereby granted, free of charge, to any person obtaining a copy of
11 this software and associated documentation files (the "Software"), to deal in
12 the Software without restriction, including without limitation the rights to
13 use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
14 the Software, and to permit persons to whom the Software is furnished to do so,
15 subject to the following conditions:
17 The above copyright notice and this permission notice shall be included in all
18 copies or substantial portions of the Software.
20 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
22 FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
23 COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
24 IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
25 CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
26"""
28import json
29import time
30from datetime import datetime
31from pathlib import Path
32from typing import Union, Any, Optional
34import simpy
35from pydantic import (
36 ConfigDict,
37 PositiveFloat,
38 BaseModel,
39 Field,
40)
41from simpy.core import SimTime, Event
43from agentlib.core import logging_ as agentlib_logging
45UNTIL_UNSET = object() # sentinel value to check if an until value has been set
48class EnvironmentConfig(BaseModel):
49 """Config for the Environment"""
51 rt: bool = False
52 factor: PositiveFloat = 1.0
53 strict: bool = False
54 t_sample: PositiveFloat = Field(
55 title="t_sample",
56 default=60,
57 description="Used to increase the now-time of"
58 "the environment using the clock function.",
59 )
60 offset: float = Field(
61 title="offset",
62 default=0,
63 description="Used to offset the now-time of"
64 "the environment using the clock function.",
65 )
66 clock: bool = False
67 model_config = ConfigDict(
68 validate_assignment=True, arbitrary_types_allowed=True, extra="forbid"
69 )
72class Environment:
73 """Simpy Environment Distributor. Handles synchronous processes."""
75 def __new__(
76 cls, *args, **kwargs
77 ) -> Union["RealtimeEnvironment", "InstantEnvironment"]:
78 config = make_env_config(kwargs.get("config"))
79 if not config.rt:
80 return InstantEnvironment(config=config)
81 if config.factor == 1:
82 return RealtimeEnvironment(config=config)
83 return ScaledRealtimeEnvironment(config=config)
86def make_env_config(
87 config: Union[dict, EnvironmentConfig, str, None],
88) -> EnvironmentConfig:
89 """Creates the environment config from different sources."""
90 if config is None:
91 return EnvironmentConfig()
92 if isinstance(config, EnvironmentConfig):
93 return config
94 if isinstance(config, (str, Path)):
95 if Path(config).exists():
96 with open(config, "r") as f:
97 config = json.load(f)
98 else:
99 config = json.loads(config)
100 return EnvironmentConfig.model_validate(config)
103class CustomSimpyEnvironment(simpy.Environment):
104 """A customized version of the simpy environment. Handles execution of modules
105 processes and manages time for instant execution mode."""
107 _config: EnvironmentConfig
109 @property
110 def config(self) -> EnvironmentConfig:
111 """Return the config of the environment"""
112 return self._config
114 @property
115 def offset(self) -> EnvironmentConfig:
116 """Start time offset of the environment."""
117 return self.config.offset
119 @property
120 def time(self) -> float:
121 """Get the current time of the environment."""
122 return self.now
124 def clock(self):
125 """Define a clock loop to increase the now-timer every other second
126 (Or whatever t_sample is)"""
127 while True:
128 self.logger.info("Current simulation time: %s", self.pretty_time())
129 yield self.timeout(self.config.t_sample)
131 def pretty_time(self): ...
133 def pretty_until(self): ...
136class InstantEnvironment(CustomSimpyEnvironment):
137 """A customized version of the simpy environment. Handles execution of modules
138 processes and manages time for instant execution mode."""
140 def __init__(self, *, config: EnvironmentConfig):
141 super().__init__(initial_time=config.offset)
142 self._config = config
143 self._until = UNTIL_UNSET
144 # Create an environment-specific logger using CustomLogger
145 self.logger = agentlib_logging.create_logger(env=self, name="environment")
146 if self.config.clock:
147 self.process(self.clock())
149 def pretty_time(self) -> str:
150 """Returns the time in seconds."""
151 return f"{self.time:.2f}s"
153 def run(self, until: Optional[Union[SimTime, Event]] = None) -> Optional[Any]:
154 self._until = until
155 return super().run(until=until)
157 def pretty_until(self) -> Union[str, None]:
158 """Returns the time in seconds."""
159 if self._until is None or self._until is UNTIL_UNSET:
160 return self._until
161 _percent_finished = round(self.time / self._until * 100, 1)
162 return f"{self._until:.2f}s" + f" ({_percent_finished} %)"
165class RealtimeEnvironment(simpy.RealtimeEnvironment, CustomSimpyEnvironment):
166 """A customized version of the simpy environment. Handles execution of modules
167 processes and manages time for real time execution mode."""
169 def __init__(self, *, config: EnvironmentConfig):
170 super().__init__(
171 initial_time=config.offset, factor=config.factor, strict=config.strict
172 )
173 self._until = UNTIL_UNSET
174 self._config = config
175 self.logger = agentlib_logging.create_logger(env=self, name="environment")
176 if self.config.clock:
177 self.process(self.clock())
178 else:
179 self.process(self.silent_clock())
181 def run(self, until: Optional[Union[SimTime, Event]] = None) -> Optional[Any]:
182 self.sync()
183 self._until = until
184 return super().run(until=until)
186 @property
187 def time(self) -> float:
188 """Get the current system time as unix timestamp, with the enivronement
189 offset."""
190 return time.time() + self.config.offset
192 def pretty_time(self) -> str:
193 """Returns the time in a datetime format."""
194 return datetime.fromtimestamp(self.time).strftime("%d-%b-%Y %H:%M:%S")
196 def pretty_until(self) -> Union[str, None]:
197 """Returns the time in a datetime format."""
198 if self._until is None or self._until is UNTIL_UNSET:
199 return self._until
200 _percent_finished = round(self.time / self._until * 100, 1)
201 return (
202 datetime.fromtimestamp(self._until).strftime("%d-%b-%Y %H:%M:%S")
203 + f" ({_percent_finished} %)"
204 )
206 def silent_clock(self):
207 """A silent clock, which does not log anything."""
208 while True:
209 yield self.timeout(self.config.t_sample)
212class ScaledRealtimeEnvironment(simpy.RealtimeEnvironment, CustomSimpyEnvironment):
213 """A customized version of the simpy environment. Handles execution of modules
214 processes and manages time for scaled real time execution mode."""
216 def __init__(self, *, config: EnvironmentConfig):
217 super().__init__(
218 initial_time=config.offset, factor=config.factor, strict=config.strict
219 )
220 self._config = config
221 self._until = UNTIL_UNSET
222 self.logger = agentlib_logging.create_logger(env=self, name="environment")
223 if self.config.clock:
224 self.process(self.clock())
225 else:
226 self.process(self.silent_clock())
228 def run(self, until: Optional[Union[SimTime, Event]] = None) -> Optional[Any]:
229 self.sync()
230 self._until = until
231 return super().run(until=until)
233 @property
234 def time(self) -> float:
235 """Get the current time of the environment."""
236 return self.now
238 def pretty_time(self) -> str:
239 """Returns the time in seconds."""
240 return f"{self.time:.2f}s"
242 def pretty_until(self) -> Union[str, None]:
243 """Returns the time in seconds."""
244 if self._until is None or self._until is UNTIL_UNSET:
245 return self._until
246 _percent_finished = round(self.time / self._until * 100, 1)
247 return f"{self._until:.2f}s" + f" ({_percent_finished} %)"
249 def silent_clock(self):
250 """A silent clock, which does not log anything."""
251 while True:
252 yield self.timeout(self.config.t_sample)
255def monkey_patch_simpy_process():
256 """Removes the exception catching in simpy processes. This removes some of simpys
257 features that we do not need. In return, it improves debugging and makes error
258 messages more concise.
259 """
261 def _describe_frame(frame) -> str:
262 """Print filename, line number and function name of a stack frame."""
263 filename, name = frame.f_code.co_filename, frame.f_code.co_name
264 lineno = frame.f_lineno
266 with open(filename) as f:
267 for no, line in enumerate(f):
268 if no + 1 == lineno:
269 return (
270 f' File "{filename}", line {lineno}, in {name}\n'
271 f" {line.strip()}\n"
272 )
273 return f' File "{filename}", line {lineno}, in {name}\n'
275 def new_resume(self, event: Event) -> None:
276 """Resumes the execution of the process with the value of *event*. If
277 the process generator exits, the process itself will get triggered with
278 the return value or the exception of the generator."""
279 # Mark the current process as active.
280 self.env._active_proc = self
282 while True:
283 # Get next event from process
284 try:
285 if event._ok:
286 event = self._generator.send(event._value)
287 else:
288 # The process has no choice but to handle the failed event
289 # (or fail itself).
290 event._defused = True
292 # Create an exclusive copy of the exception for this
293 # process to prevent traceback modifications by other
294 # processes.
295 exc = type(event._value)(*event._value.args)
296 exc.__cause__ = event._value
297 event = self._generator.throw(exc)
298 except StopIteration as e:
299 # Process has terminated.
300 event = None # type: ignore
301 self._ok = True
302 self._value = e.args[0] if len(e.args) else None
303 self.env.schedule(self)
304 break
306 # Process returned another event to wait upon.
307 try:
308 # Be optimistic and blindly access the callbacks attribute.
309 if event.callbacks is not None:
310 # The event has not yet been triggered. Register callback
311 # to resume the process if that happens.
312 event.callbacks.append(self._resume)
313 break
314 except AttributeError:
315 # Our optimism didn't work out, figure out what went wrong and
316 # inform the user.
317 if hasattr(event, "callbacks"):
318 raise
320 msg = f'Invalid yield value "{event}"'
321 descr = _describe_frame(self._generator.gi_frame)
322 raise RuntimeError(f"\n{descr}{msg}") from None
324 self._target = event
325 self.env._active_proc = None
327 simpy.Process._resume = new_resume
330monkey_patch_simpy_process()