Source code for agentlib.core.environment

"""This module contains the Environment class, used by all Agents and Modules.

This module contains modified code of simpy (https://gitlab.com/team-simpy/simpy).
Simpy is distributed under the MIT License

    The MIT License (MIT)

    Copyright (c) 2013 Ontje Lünsdorf and Stefan Scherfke (also see AUTHORS.txt)

    Permission is hereby granted, free of charge, to any person obtaining a copy of
    this software and associated documentation files (the "Software"), to deal in
    the Software without restriction, including without limitation the rights to
    use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
    the Software, and to permit persons to whom the Software is furnished to do so,
    subject to the following conditions:

    The above copyright notice and this permission notice shall be included in all
    copies or substantial portions of the Software.

    THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
    IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
    FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
    COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
    IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
    CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""

import json
import logging
import time
from datetime import datetime
from pathlib import Path
from typing import Union, Any, Optional

import simpy
from pydantic import (
    ConfigDict,
    PositiveFloat,
    BaseModel,
    Field,
)
from simpy.core import SimTime, Event

logger = logging.getLogger(name=__name__)


[docs]class EnvironmentConfig(BaseModel): """Config for the Environment""" rt: bool = False factor: PositiveFloat = 1.0 strict: bool = False t_sample: PositiveFloat = Field( title="t_sample", default=60, description="Used to increase the now-time of" "the environment using the clock function.", ) offset: float = Field( title="offset", default=0, description="Used to offset the now-time of" "the environment using the clock function.", ) clock: bool = False model_config = ConfigDict( validate_assignment=True, arbitrary_types_allowed=True, extra="forbid" )
[docs]class Environment: """Simpy Environment Distributor. Handles synchronous processes.""" def __new__( cls, *args, **kwargs ) -> Union["RealtimeEnvironment", "InstantEnvironment"]: config = make_env_config(kwargs.get("config")) if not config.rt: return InstantEnvironment(config=config) if config.factor == 1: return RealtimeEnvironment(config=config) return ScaledRealtimeEnvironment(config=config)
[docs]def make_env_config( config: Union[dict, EnvironmentConfig, str, None], ) -> EnvironmentConfig: """Creates the environment config from different sources.""" if config is None: return EnvironmentConfig() if isinstance(config, EnvironmentConfig): return config if isinstance(config, (str, Path)): if Path(config).exists(): with open(config, "r") as f: config = json.load(f) else: config = json.loads(config) return EnvironmentConfig.model_validate(config)
[docs]class CustomSimpyEnvironment(simpy.Environment): """A customized version of the simpy environment. Handles execution of modules processes and manages time for instant execution mode.""" _config: EnvironmentConfig @property def config(self) -> EnvironmentConfig: """Return the config of the environment""" return self._config @property def offset(self) -> EnvironmentConfig: """Start time offset of the environment.""" return self.config.offset @property def time(self) -> float: """Get the current time of the environment.""" return self.now
[docs] def clock(self): """Define a clock loop to increase the now-timer every other second (Or whatever t_sample is)""" while True: logger.info("Current simulation time: %s", self.pretty_time()) yield self.timeout(self.config.t_sample)
[docs] def pretty_time(self): ...
[docs]class InstantEnvironment(CustomSimpyEnvironment): """A customized version of the simpy environment. Handles execution of modules processes and manages time for instant execution mode.""" def __init__(self, *, config: EnvironmentConfig): super().__init__(initial_time=config.offset) self._config = config if self.config.clock: self.process(self.clock())
[docs] def pretty_time(self) -> str: """Returns the time in seconds.""" return f"{self.time:.2f}s"
[docs]class RealtimeEnvironment(simpy.RealtimeEnvironment, CustomSimpyEnvironment): """A customized version of the simpy environment. Handles execution of modules processes and manages time for real time execution mode.""" def __init__(self, *, config: EnvironmentConfig): super().__init__( initial_time=config.offset, factor=config.factor, strict=config.strict ) self._config = config if self.config.clock: self.process(self.clock()) else: self.process(self.silent_clock())
[docs] def run(self, until: Optional[Union[SimTime, Event]] = None) -> Optional[Any]: self.sync() return super().run(until=until)
@property def time(self) -> float: """Get the current system time as unix timestamp, with the enivronement offset.""" return time.time() + self.config.offset
[docs] def pretty_time(self) -> str: """Returns the time in a datetime format.""" return datetime.fromtimestamp(self.time).strftime("%d-%b-%Y %H:%M:%S")
[docs] def silent_clock(self): """A silent clock, which does not log anything.""" while True: yield self.timeout(self.config.t_sample)
[docs]class ScaledRealtimeEnvironment(simpy.RealtimeEnvironment, CustomSimpyEnvironment): """A customized version of the simpy environment. Handles execution of modules processes and manages time for scaled real time execution mode.""" def __init__(self, *, config: EnvironmentConfig): super().__init__( initial_time=config.offset, factor=config.factor, strict=config.strict ) self._config = config if self.config.clock: self.process(self.clock()) else: self.process(self.silent_clock())
[docs] def run(self, until: Optional[Union[SimTime, Event]] = None) -> Optional[Any]: self.sync() return super().run(until=until)
@property def time(self) -> float: """Get the current time of the environment.""" return self.now
[docs] def pretty_time(self) -> str: """Returns the time in seconds.""" return f"{self.time:.2f}s"
[docs] def silent_clock(self): """A silent clock, which does not log anything.""" while True: yield self.timeout(self.config.t_sample)
[docs]def monkey_patch_simpy_process(): """Removes the exception catching in simpy processes. This removes some of simpys features that we do not need. In return, it improves debugging and makes error messages more concise. """ def _describe_frame(frame) -> str: """Print filename, line number and function name of a stack frame.""" filename, name = frame.f_code.co_filename, frame.f_code.co_name lineno = frame.f_lineno with open(filename) as f: for no, line in enumerate(f): if no + 1 == lineno: return ( f' File "{filename}", line {lineno}, in {name}\n' f" {line.strip()}\n" ) return f' File "{filename}", line {lineno}, in {name}\n' def new_resume(self, event: Event) -> None: """Resumes the execution of the process with the value of *event*. If the process generator exits, the process itself will get triggered with the return value or the exception of the generator.""" # Mark the current process as active. self.env._active_proc = self while True: # Get next event from process try: if event._ok: event = self._generator.send(event._value) else: # The process has no choice but to handle the failed event # (or fail itself). event._defused = True # Create an exclusive copy of the exception for this # process to prevent traceback modifications by other # processes. exc = type(event._value)(*event._value.args) exc.__cause__ = event._value event = self._generator.throw(exc) except StopIteration as e: # Process has terminated. event = None # type: ignore self._ok = True self._value = e.args[0] if len(e.args) else None self.env.schedule(self) break # Process returned another event to wait upon. try: # Be optimistic and blindly access the callbacks attribute. if event.callbacks is not None: # The event has not yet been triggered. Register callback # to resume the process if that happens. event.callbacks.append(self._resume) break except AttributeError: # Our optimism didn't work out, figure out what went wrong and # inform the user. if hasattr(event, "callbacks"): raise msg = f'Invalid yield value "{event}"' descr = _describe_frame(self._generator.gi_frame) raise RuntimeError(f"\n{descr}{msg}") from None self._target = event self.env._active_proc = None simpy.Process._resume = new_resume
monkey_patch_simpy_process()