Coverage for agentlib/core/environment.py: 74%
144 statements
« prev ^ index » next coverage.py v7.4.4, created at 2025-07-21 13:38 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2025-07-21 13:38 +0000
1"""This module contains the Environment class, used by all Agents and Modules.
3This module contains modified code of simpy (https://gitlab.com/team-simpy/simpy).
4Simpy is distributed under the MIT License
6 The MIT License (MIT)
8 Copyright (c) 2013 Ontje Lünsdorf and Stefan Scherfke (also see AUTHORS.txt)
10 Permission is hereby granted, free of charge, to any person obtaining a copy of
11 this software and associated documentation files (the "Software"), to deal in
12 the Software without restriction, including without limitation the rights to
13 use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
14 the Software, and to permit persons to whom the Software is furnished to do so,
15 subject to the following conditions:
17 The above copyright notice and this permission notice shall be included in all
18 copies or substantial portions of the Software.
20 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
22 FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
23 COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
24 IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
25 CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
26"""
28import json
29import time
30from datetime import datetime
31from pathlib import Path
32from typing import Union, Any, Optional
34import simpy
35from pydantic import (
36 ConfigDict,
37 PositiveFloat,
38 BaseModel,
39 Field,
40)
41from simpy.core import SimTime, Event
43from agentlib.core import logging_ as agentlib_logging
45UNTIL_UNSET = object() # sentinel value to check if an until value has been set
48class EnvironmentConfig(BaseModel):
49 """Config for the Environment"""
51 rt: bool = False
52 factor: PositiveFloat = 1.0
53 strict: bool = False
54 t_sample: PositiveFloat = Field(
55 title="t_sample",
56 default=60,
57 description="Used to increase the now-time of"
58 "the environment using the clock function.",
59 )
60 offset: float = Field(
61 title="offset",
62 default=0,
63 description="Used to offset the now-time of"
64 "the environment using the clock function.",
65 )
66 clock: bool = False
67 model_config = ConfigDict(
68 validate_assignment=True, arbitrary_types_allowed=True, extra="forbid"
69 )
72class Environment:
73 """Simpy Environment Distributor. Handles synchronous processes."""
75 def __new__(
76 cls, *args, **kwargs
77 ) -> Union["RealtimeEnvironment", "InstantEnvironment"]:
78 config = make_env_config(kwargs.get("config"))
79 if not config.rt:
80 return InstantEnvironment(config=config)
81 if config.factor == 1:
82 return RealtimeEnvironment(config=config)
83 return ScaledRealtimeEnvironment(config=config)
86def make_env_config(
87 config: Union[dict, EnvironmentConfig, str, None],
88) -> EnvironmentConfig:
89 """Creates the environment config from different sources."""
90 if config is None:
91 return EnvironmentConfig()
92 if isinstance(config, EnvironmentConfig):
93 return config
94 if isinstance(config, (str, Path)):
95 if Path(config).exists():
96 with open(config, "r") as f:
97 config = json.load(f)
98 else:
99 config = json.loads(config)
100 return EnvironmentConfig.model_validate(config)
103class CustomSimpyEnvironment(simpy.Environment):
104 """A customized version of the simpy environment. Handles execution of modules
105 processes and manages time for instant execution mode."""
107 _config: EnvironmentConfig
109 @property
110 def config(self) -> EnvironmentConfig:
111 """Return the config of the environment"""
112 return self._config
114 @property
115 def offset(self) -> EnvironmentConfig:
116 """Start time offset of the environment."""
117 return self.config.offset
119 @property
120 def time(self) -> float:
121 """Get the current time of the environment."""
122 return self.now
124 def clock(self):
125 """Define a clock loop to increase the now-timer every other second
126 (Or whatever t_sample is)"""
127 while True:
128 self.logger.info("Current simulation time: %s", self.pretty_time())
129 yield self.timeout(self.config.t_sample)
131 def pretty_time(self): ...
133 def pretty_until(self) -> Optional[float]:
134 return None
136 def calculate_percentage_for_pretty_until(self) -> float:
137 """Calculate the percentage of completion, accounting for offset."""
138 # Account for offset in percentage calculation
139 simulation_elapsed = self.time - self.config.offset
140 simulation_total = self._until - self.config.offset
141 # Avoid division by zero
142 if simulation_total <= 0:
143 return 0.0
144 return round(simulation_elapsed / simulation_total * 100, 1)
147class InstantEnvironment(CustomSimpyEnvironment):
148 """A customized version of the simpy environment. Handles execution of modules
149 processes and manages time for instant execution mode."""
151 def __init__(self, *, config: EnvironmentConfig):
152 super().__init__(initial_time=config.offset)
153 self._config = config
154 self._until = UNTIL_UNSET
155 # Create an environment-specific logger using CustomLogger
156 self.logger = agentlib_logging.create_logger(env=self, name="environment")
157 if self.config.clock:
158 self.process(self.clock())
160 def pretty_time(self) -> str:
161 """Returns the time in seconds."""
162 return f"{self.time:.2f}s"
164 def run(self, until: Optional[Union[SimTime, Event]] = None) -> Optional[Any]:
165 self._until = until
166 return super().run(until=until)
168 def pretty_until(self) -> Union[str, None]:
169 """Returns the time in seconds."""
170 if self._until is None or self._until is UNTIL_UNSET:
171 return self._until
172 percent_finished = self.calculate_percentage_for_pretty_until()
173 return f"{self._until:.2f}s ({percent_finished} %)"
176class RealtimeEnvironment(simpy.RealtimeEnvironment, CustomSimpyEnvironment):
177 """A customized version of the simpy environment. Handles execution of modules
178 processes and manages time for real time execution mode."""
180 def __init__(self, *, config: EnvironmentConfig):
181 super().__init__(
182 initial_time=config.offset, factor=config.factor, strict=config.strict
183 )
184 self._until = UNTIL_UNSET
185 self._config = config
186 self.logger = agentlib_logging.create_logger(env=self, name="environment")
187 if self.config.clock:
188 self.process(self.clock())
189 else:
190 self.process(self.silent_clock())
192 def run(self, until: Optional[Union[SimTime, Event]] = None) -> Optional[Any]:
193 self.sync()
194 self._until = until
195 return super().run(until=until)
197 @property
198 def time(self) -> float:
199 """Get the current system time as unix timestamp, with the enivronement
200 offset."""
201 return time.time() + self.config.offset
203 def pretty_time(self) -> str:
204 """Returns the time in a datetime format."""
205 return datetime.fromtimestamp(self.time).strftime("%d-%b-%Y %H:%M:%S")
207 def silent_clock(self):
208 """A silent clock, which does not log anything."""
209 while True:
210 yield self.timeout(self.config.t_sample)
213class ScaledRealtimeEnvironment(simpy.RealtimeEnvironment, CustomSimpyEnvironment):
214 """A customized version of the simpy environment. Handles execution of modules
215 processes and manages time for scaled real time execution mode."""
217 def __init__(self, *, config: EnvironmentConfig):
218 super().__init__(
219 initial_time=config.offset, factor=config.factor, strict=config.strict
220 )
221 self._config = config
222 self._until = UNTIL_UNSET
223 self.logger = agentlib_logging.create_logger(env=self, name="environment")
224 if self.config.clock:
225 self.process(self.clock())
226 else:
227 self.process(self.silent_clock())
229 def run(self, until: Optional[Union[SimTime, Event]] = None) -> Optional[Any]:
230 self.sync()
231 self._until = until
232 return super().run(until=until)
234 @property
235 def time(self) -> float:
236 """Get the current time of the environment."""
237 return self.now
239 def pretty_time(self) -> str:
240 """Returns the time in seconds."""
241 return f"{self.time:.2f}s"
243 def silent_clock(self):
244 """A silent clock, which does not log anything."""
245 while True:
246 yield self.timeout(self.config.t_sample)
249def monkey_patch_simpy_process():
250 """Removes the exception catching in simpy processes. This removes some of simpys
251 features that we do not need. In return, it improves debugging and makes error
252 messages more concise.
253 """
255 def _describe_frame(frame) -> str:
256 """Print filename, line number and function name of a stack frame."""
257 filename, name = frame.f_code.co_filename, frame.f_code.co_name
258 lineno = frame.f_lineno
260 with open(filename) as f:
261 for no, line in enumerate(f):
262 if no + 1 == lineno:
263 return (
264 f' File "{filename}", line {lineno}, in {name}\n'
265 f" {line.strip()}\n"
266 )
267 return f' File "{filename}", line {lineno}, in {name}\n'
269 def new_resume(self, event: Event) -> None:
270 """Resumes the execution of the process with the value of *event*. If
271 the process generator exits, the process itself will get triggered with
272 the return value or the exception of the generator."""
273 # Mark the current process as active.
274 self.env._active_proc = self
276 while True:
277 # Get next event from process
278 try:
279 if event._ok:
280 event = self._generator.send(event._value)
281 else:
282 # The process has no choice but to handle the failed event
283 # (or fail itself).
284 event._defused = True
286 # Create an exclusive copy of the exception for this
287 # process to prevent traceback modifications by other
288 # processes.
289 exc = type(event._value)(*event._value.args)
290 exc.__cause__ = event._value
291 event = self._generator.throw(exc)
292 except StopIteration as e:
293 # Process has terminated.
294 event = None # type: ignore
295 self._ok = True
296 self._value = e.args[0] if len(e.args) else None
297 self.env.schedule(self)
298 break
300 # Process returned another event to wait upon.
301 try:
302 # Be optimistic and blindly access the callbacks attribute.
303 if event.callbacks is not None:
304 # The event has not yet been triggered. Register callback
305 # to resume the process if that happens.
306 event.callbacks.append(self._resume)
307 break
308 except AttributeError:
309 # Our optimism didn't work out, figure out what went wrong and
310 # inform the user.
311 if hasattr(event, "callbacks"):
312 raise
314 msg = f'Invalid yield value "{event}"'
315 descr = _describe_frame(self._generator.gi_frame)
316 raise RuntimeError(f"\n{descr}{msg}") from None
318 self._target = event
319 self.env._active_proc = None
321 simpy.Process._resume = new_resume
324monkey_patch_simpy_process()