Coverage for agentlib/core/environment.py: 77%
120 statements
« prev ^ index » next coverage.py v7.4.4, created at 2025-04-07 16:27 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2025-04-07 16:27 +0000
1"""This module contains the Environment class, used by all Agents and Modules.
3This module contains modified code of simpy (https://gitlab.com/team-simpy/simpy).
4Simpy is distributed under the MIT License
6 The MIT License (MIT)
8 Copyright (c) 2013 Ontje Lünsdorf and Stefan Scherfke (also see AUTHORS.txt)
10 Permission is hereby granted, free of charge, to any person obtaining a copy of
11 this software and associated documentation files (the "Software"), to deal in
12 the Software without restriction, including without limitation the rights to
13 use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
14 the Software, and to permit persons to whom the Software is furnished to do so,
15 subject to the following conditions:
17 The above copyright notice and this permission notice shall be included in all
18 copies or substantial portions of the Software.
20 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
22 FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
23 COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
24 IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
25 CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
26"""
28import json
29import logging
30import time
31from datetime import datetime
32from pathlib import Path
33from typing import Union, Any, Optional
35import simpy
36from pydantic import (
37 ConfigDict,
38 PositiveFloat,
39 BaseModel,
40 Field,
41)
42from simpy.core import SimTime, Event
44logger = logging.getLogger(name=__name__)
47class EnvironmentConfig(BaseModel):
48 """Config for the Environment"""
50 rt: bool = False
51 factor: PositiveFloat = 1.0
52 strict: bool = False
53 t_sample: PositiveFloat = Field(
54 title="t_sample",
55 default=60,
56 description="Used to increase the now-time of"
57 "the environment using the clock function.",
58 )
59 offset: float = Field(
60 title="offset",
61 default=0,
62 description="Used to offset the now-time of"
63 "the environment using the clock function.",
64 )
65 clock: bool = False
66 model_config = ConfigDict(
67 validate_assignment=True, arbitrary_types_allowed=True, extra="forbid"
68 )
71class Environment:
72 """Simpy Environment Distributor. Handles synchronous processes."""
74 def __new__(
75 cls, *args, **kwargs
76 ) -> Union["RealtimeEnvironment", "InstantEnvironment"]:
77 config = make_env_config(kwargs.get("config"))
78 if not config.rt:
79 return InstantEnvironment(config=config)
80 if config.factor == 1:
81 return RealtimeEnvironment(config=config)
82 return ScaledRealtimeEnvironment(config=config)
85def make_env_config(
86 config: Union[dict, EnvironmentConfig, str, None],
87) -> EnvironmentConfig:
88 """Creates the environment config from different sources."""
89 if config is None:
90 return EnvironmentConfig()
91 if isinstance(config, EnvironmentConfig):
92 return config
93 if isinstance(config, (str, Path)):
94 if Path(config).exists():
95 with open(config, "r") as f:
96 config = json.load(f)
97 else:
98 config = json.loads(config)
99 return EnvironmentConfig.model_validate(config)
102class CustomSimpyEnvironment(simpy.Environment):
103 """A customized version of the simpy environment. Handles execution of modules
104 processes and manages time for instant execution mode."""
106 _config: EnvironmentConfig
108 @property
109 def config(self) -> EnvironmentConfig:
110 """Return the config of the environment"""
111 return self._config
113 @property
114 def offset(self) -> EnvironmentConfig:
115 """Start time offset of the environment."""
116 return self.config.offset
118 @property
119 def time(self) -> float:
120 """Get the current time of the environment."""
121 return self.now
123 def clock(self):
124 """Define a clock loop to increase the now-timer every other second
125 (Or whatever t_sample is)"""
126 while True:
127 logger.info("Current simulation time: %s", self.pretty_time())
128 yield self.timeout(self.config.t_sample)
130 def pretty_time(self): ...
133class InstantEnvironment(CustomSimpyEnvironment):
134 """A customized version of the simpy environment. Handles execution of modules
135 processes and manages time for instant execution mode."""
137 def __init__(self, *, config: EnvironmentConfig):
138 super().__init__(initial_time=config.offset)
139 self._config = config
140 if self.config.clock:
141 self.process(self.clock())
143 def pretty_time(self) -> str:
144 """Returns the time in seconds."""
145 return f"{self.time:.2f}s"
148class RealtimeEnvironment(simpy.RealtimeEnvironment, CustomSimpyEnvironment):
149 """A customized version of the simpy environment. Handles execution of modules
150 processes and manages time for real time execution mode."""
152 def __init__(self, *, config: EnvironmentConfig):
153 super().__init__(
154 initial_time=config.offset, factor=config.factor, strict=config.strict
155 )
156 self._config = config
157 if self.config.clock:
158 self.process(self.clock())
159 else:
160 self.process(self.silent_clock())
162 def run(self, until: Optional[Union[SimTime, Event]] = None) -> Optional[Any]:
163 self.sync()
164 return super().run(until=until)
166 @property
167 def time(self) -> float:
168 """Get the current system time as unix timestamp, with the enivronement
169 offset."""
170 return time.time() + self.config.offset
172 def pretty_time(self) -> str:
173 """Returns the time in a datetime format."""
174 return datetime.fromtimestamp(self.time).strftime("%d-%b-%Y %H:%M:%S")
176 def silent_clock(self):
177 """A silent clock, which does not log anything."""
178 while True:
179 yield self.timeout(self.config.t_sample)
182class ScaledRealtimeEnvironment(simpy.RealtimeEnvironment, CustomSimpyEnvironment):
183 """A customized version of the simpy environment. Handles execution of modules
184 processes and manages time for scaled real time execution mode."""
186 def __init__(self, *, config: EnvironmentConfig):
187 super().__init__(
188 initial_time=config.offset, factor=config.factor, strict=config.strict
189 )
190 self._config = config
191 if self.config.clock:
192 self.process(self.clock())
193 else:
194 self.process(self.silent_clock())
196 def run(self, until: Optional[Union[SimTime, Event]] = None) -> Optional[Any]:
197 self.sync()
198 return super().run(until=until)
200 @property
201 def time(self) -> float:
202 """Get the current time of the environment."""
203 return self.now
205 def pretty_time(self) -> str:
206 """Returns the time in seconds."""
207 return f"{self.time:.2f}s"
209 def silent_clock(self):
210 """A silent clock, which does not log anything."""
211 while True:
212 yield self.timeout(self.config.t_sample)
215def monkey_patch_simpy_process():
216 """Removes the exception catching in simpy processes. This removes some of simpys
217 features that we do not need. In return, it improves debugging and makes error
218 messages more concise.
219 """
221 def _describe_frame(frame) -> str:
222 """Print filename, line number and function name of a stack frame."""
223 filename, name = frame.f_code.co_filename, frame.f_code.co_name
224 lineno = frame.f_lineno
226 with open(filename) as f:
227 for no, line in enumerate(f):
228 if no + 1 == lineno:
229 return (
230 f' File "{filename}", line {lineno}, in {name}\n'
231 f" {line.strip()}\n"
232 )
233 return f' File "{filename}", line {lineno}, in {name}\n'
235 def new_resume(self, event: Event) -> None:
236 """Resumes the execution of the process with the value of *event*. If
237 the process generator exits, the process itself will get triggered with
238 the return value or the exception of the generator."""
239 # Mark the current process as active.
240 self.env._active_proc = self
242 while True:
243 # Get next event from process
244 try:
245 if event._ok:
246 event = self._generator.send(event._value)
247 else:
248 # The process has no choice but to handle the failed event
249 # (or fail itself).
250 event._defused = True
252 # Create an exclusive copy of the exception for this
253 # process to prevent traceback modifications by other
254 # processes.
255 exc = type(event._value)(*event._value.args)
256 exc.__cause__ = event._value
257 event = self._generator.throw(exc)
258 except StopIteration as e:
259 # Process has terminated.
260 event = None # type: ignore
261 self._ok = True
262 self._value = e.args[0] if len(e.args) else None
263 self.env.schedule(self)
264 break
266 # Process returned another event to wait upon.
267 try:
268 # Be optimistic and blindly access the callbacks attribute.
269 if event.callbacks is not None:
270 # The event has not yet been triggered. Register callback
271 # to resume the process if that happens.
272 event.callbacks.append(self._resume)
273 break
274 except AttributeError:
275 # Our optimism didn't work out, figure out what went wrong and
276 # inform the user.
277 if hasattr(event, "callbacks"):
278 raise
280 msg = f'Invalid yield value "{event}"'
281 descr = _describe_frame(self._generator.gi_frame)
282 raise RuntimeError(f"\n{descr}{msg}") from None
284 self._target = event
285 self.env._active_proc = None
287 simpy.Process._resume = new_resume
290monkey_patch_simpy_process()