Coverage for agentlib/models/fmu_model.py: 32%
164 statements
« prev ^ index » next coverage.py v7.4.4, created at 2025-04-07 16:27 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2025-04-07 16:27 +0000
1"""This module contains the FMUModel class."""
3import queue
4import shutil
5import os
6import logging
7import pathlib
8import uuid
9from itertools import chain
10from typing import Union, List
12import attrs
13import pydantic
15from pydantic import field_validator, FilePath
16from agentlib.core import Model, ModelConfig
17from agentlib.core.errors import OptionalDependencyError
18from agentlib.core.datamodels import ModelVariable, Causality
20try:
21 import fmpy.fmi2
22 import fmpy
23 from fmpy.fmi1 import FMICallException
24except ImportError as err:
25 raise OptionalDependencyError(
26 dependency_name="fmu", dependency_install="fmpy", used_object="FMU-model"
27 ) from err
29logger = logging.getLogger(__name__)
32class FmuModelConfig(ModelConfig):
33 """
34 The Config of FMUModels overwrite the default
35 ModelConfig to redefine the system and add relevant
36 fields like path and tolerance of the simulation.
37 """
39 path: FilePath
40 tolerance: float = 0.001
41 extract_fmu: bool = False
42 log_fmu: bool = True
43 only_config_variables: bool = pydantic.Field(
44 default=True,
45 description="If True, only the variables passed to this model by a simulator "
46 "will be read and written at each simulation step (specified by "
47 "dt).",
48 )
50 @field_validator("path")
51 @classmethod
52 def check_path(cls, path):
53 """Check if the path has the correct extension"""
54 # check file extension
55 assert path.suffix in [".fmu", ".mo"], "Unknown file-extension"
56 return path
59class FmuModel(Model):
60 """Class to wrap any FMU Model into the Model-Standard
61 of the agentlib.
62 """
64 config: FmuModelConfig
66 def __init__(self, **kwargs):
67 # Private map to link variable names (str) to the fmu value reference (int)
68 self._variables_vr = {}
69 self._unzip_dir = None
70 # Initialize config
71 super().__init__(**kwargs)
72 # Actively set the path to trigger the automatic setup (evokes __init_fmu)
73 self.system = self.__init_fmu()
74 initial_write_values = self.inputs + self.parameters
75 self._variables_to_write = queue.Queue()
76 [self._variables_to_write.put(v) for v in initial_write_values]
78 @property
79 def tolerance(self):
80 """Get the tolerance of FMU simulation"""
81 return self.config.tolerance
83 @tolerance.setter
84 def tolerance(self, tolerance: float):
85 """Set the tolerance in the config."""
86 self.config.tolerance = tolerance
88 @tolerance.deleter
89 def tolerance(self):
90 """Delete the tolerance and restore the default value."""
91 self.config.tolerance = self.get_config_type().tolerance
93 @property
94 def extract_fmu(self):
95 """Get whether the fmu shall be extracted to a new
96 directory or if the temp folder is used."""
97 return self.config.extract_fmu
99 def do_step(self, *, t_start, t_sample=None):
100 if t_sample is None:
101 t_sample = self.dt
102 # Write current values to system
103 while not self._variables_to_write.empty():
104 self.__write_value(self._variables_to_write.get_nowait())
105 t_samples = self._create_time_samples(t_sample=t_sample) + t_start
106 try:
107 for _idx, _t_sample in enumerate(t_samples[:-1]):
108 self.system.doStep(
109 currentCommunicationPoint=_t_sample,
110 communicationStepSize=t_samples[_idx + 1] - _t_sample,
111 )
112 except FMICallException as e:
113 # Raise a different error, as simpy does not work well with FMI Errors
114 raise RuntimeError(
115 "The fmu had an internal error. Please check the logs to analyze it."
116 ) from e
117 # Read current values from system
118 self.__read_values()
119 return True
121 def initialize(self, **kwargs):
122 """
123 Initializes FMU model
125 Required kwargs:
126 t_start (float): Start time of simulation
127 t_stop (float): Stop time of simulation
128 """
129 logger.info("Initializing model...")
130 # Handle Logging of the FMU itself:
131 try:
132 callbacks = fmpy.fmi2.fmi2CallbackFunctions()
133 callbacks.logger = fmpy.fmi2.fmi2CallbackLoggerTYPE(self._fmu_logger)
134 callbacks.allocateMemory = fmpy.fmi2.fmi2CallbackAllocateMemoryTYPE(
135 fmpy.calloc
136 )
137 callbacks.freeMemory = fmpy.fmi2.fmi2CallbackFreeMemoryTYPE(fmpy.free)
138 from fmpy.logging import addLoggerProxy
139 from ctypes import byref
141 addLoggerProxy(byref(callbacks))
142 except Exception as err:
143 logger.error("Could not setup custom logger in FMU model: %s", err)
144 callbacks = None
146 self.system.instantiate(
147 callbacks=callbacks,
148 loggingOn=False, # Only for debug of fmu itself
149 visible=False, # Only for debug of fmu itself
150 )
151 self.system.reset()
152 self.system.setupExperiment(
153 startTime=kwargs["t_start"],
154 stopTime=kwargs["t_stop"],
155 tolerance=self.tolerance,
156 )
157 self.system.enterInitializationMode()
158 self.system.exitInitializationMode()
159 logger.info("Model: %s initialized", self.name)
161 def _fmu_logger(self, component, instanceName, status, category, message):
162 """Print the FMU's log messages to the command line (works for both FMI 1.0 and 2.0)"""
163 # pylint: disable=unused-argument, invalid-name
164 if self.config.log_fmu:
165 label = ["OK", "WARNING", "DISCARD", "ERROR", "FATAL", "PENDING"][status]
166 _level_map = {
167 "OK": logging.INFO,
168 "WARNING": logging.WARNING,
169 "DISCARD": logging.WARNING,
170 "ERROR": logging.ERROR,
171 "FATAL": logging.FATAL,
172 "PENDING": logging.FATAL,
173 }
174 logger.log(level=_level_map[label], msg=message.decode("utf-8"))
176 def __init_fmu(self) -> fmpy.fmi2.FMU2Slave:
177 path = self.config.path
178 # System setup:
179 # extract the FMU
180 if self.extract_fmu:
181 # Create own unzip directory
182 _path = pathlib.Path(path)
183 if not _path.is_absolute():
184 _path = pathlib.Path(os.getcwd()).joinpath(_path)
185 _unzip_dir = _path.parents[0].joinpath(
186 f'{_path.name.replace(".fmu", "")}' f"_extracted_{uuid.uuid4()}"
187 )
188 _unzip_dir = str(_unzip_dir)
189 else:
190 _unzip_dir = None
191 cur_cwd = os.getcwd()
192 self._unzip_dir = fmpy.extract(filename=path, unzipdir=_unzip_dir)
193 os.chdir(cur_cwd) # Reset cwd. fmpy changes it sometimes.
194 # Read the model description
195 _model_description = fmpy.read_model_description(self._unzip_dir, validate=True)
196 _system = fmpy.fmi2.FMU2Slave(
197 guid=_model_description.guid,
198 unzipDirectory=self._unzip_dir,
199 modelIdentifier=_model_description.coSimulation.modelIdentifier,
200 instanceName=__name__,
201 fmiCallLogger=None,
202 )
203 self.name = _model_description.modelName
204 if _model_description.description is not None:
205 self.description = _model_description.description
207 # Variable setup:
208 # Get the inputs, outputs, internals and parameters
209 self._variables_vr = {}
210 _vars = {
211 Causality.input: [],
212 Causality.parameter: [],
213 Causality.calculatedParameter: [],
214 Causality.output: [],
215 Causality.local: [],
216 Causality.independent: [],
217 }
218 config_vars = set(self.config.get_variable_names())
219 for _model_var in _model_description.modelVariables:
220 # Convert to an agentlib ModelVariable object
221 if _model_var.type == "String":
222 logger.warning(
223 "String variable %s omitted. Not supported in AgentLib.",
224 _model_var.name,
225 )
226 continue # Don't allow string model variables
228 # if desired, we skip adding variables to this model instance if they are
229 # not specified from outside. They will remain within the
230 # fmpy.fmi2.FMU2Slave instance
231 if self.config.only_config_variables and _model_var.name not in config_vars:
232 continue
234 _vars[_model_var.causality].append(
235 dict(
236 name=_model_var.name,
237 type=_model_var.type,
238 value=(
239 self._converter(_model_var.type, _model_var.start)
240 if (
241 _model_var.causality
242 in [
243 Causality.parameter,
244 Causality.calculatedParameter,
245 Causality.input,
246 ]
247 and _model_var.start is not None
248 )
249 else None
250 ),
251 unit=(
252 _model_var.unit
253 if _model_var.unit is not None
254 else attrs.fields(ModelVariable).unit.default
255 ),
256 description=(
257 _model_var.description
258 if _model_var.description is not None
259 else attrs.fields(ModelVariable).description.default
260 ),
261 causality=_model_var.causality,
262 variability=_model_var.variability,
263 )
264 )
265 self._variables_vr.update({_model_var.name: _model_var.valueReference})
266 # This sets the inputs, outputs, internals and parameters and variables
267 self.config = {
268 "inputs": _vars[Causality.input],
269 "outputs": _vars[Causality.output],
270 "parameters": _vars[Causality.parameter]
271 + _vars[Causality.calculatedParameter],
272 "states": _vars[Causality.local] + _vars[Causality.independent],
273 **self.config.model_dump(
274 exclude={"inputs", "outputs", "states", "parameters"}
275 ),
276 }
277 return _system
279 def __write_value(self, var: ModelVariable):
280 # One can only set inputs and parameters!
282 if var.value is None:
283 logger.error(
284 "Tried setting the value of %s to None. This will not be set in the "
285 "FMU.",
286 var.name,
287 )
288 return
290 _vr = self._variables_vr[var.name]
291 if var.type == "Real":
292 self.system.setReal([_vr], [float(var.value)])
293 elif var.type in ["Integer", "Enumeration"]:
294 self.system.setInteger([_vr], [int(var.value)])
295 elif var.type == "Boolean":
296 self.system.setBoolean([_vr], [bool(var.value)])
297 else:
298 logger.error("Variable %s not valid for this model!", var.name)
300 def set_input_values(self, names: List[str], values: List[Union[float, int, bool]]):
301 """Sets input values in the model and in the FMU."""
302 super().set_input_values(names=names, values=values)
303 for name in names:
304 var = self._inputs[name]
305 self._variables_to_write.put(var)
307 def set_parameter_values(
308 self, names: List[str], values: List[Union[float, int, bool]]
309 ):
310 """Sets parameter values in the model and in the FMU."""
311 super().set_parameter_values(names=names, values=values)
312 for name in names:
313 var = self._parameters[name]
314 self._variables_to_write.put(var)
316 def __read_values(self):
317 for _var in chain.from_iterable([self.outputs, self.parameters, self.states]):
318 _vr = self._variables_vr[_var.name]
319 if _var.type == "Real":
320 _var.value = self.system.getReal([_vr])[0]
321 elif _var.type in ["Integer", "Enumeration"]:
322 _var.value = self.system.getInteger([_vr])[0]
323 elif _var.type == "Boolean":
324 _var.value = self.system.getBoolean([_vr])[0]
325 else:
326 raise TypeError(
327 f"Unsupported type: {_var.type} for variable {_var.name}"
328 )
330 def __enter__(self):
331 self._terminate_and_free_instance()
333 def __exit__(self, exc_type, exc_val, exc_tb):
334 self._terminate_and_free_instance()
335 # clean up
336 shutil.rmtree(self._unzip_dir, ignore_errors=True)
338 def _terminate_and_free_instance(self):
339 try:
340 self.system.terminate()
341 except Exception as err:
342 logger.error("Could not terminate FMU instance %s", err)
343 try:
344 self.system.freeInstance()
345 except Exception as err:
346 logger.error("Could not terminate FMU instance %s", err)
348 def terminate(self):
349 """Overwrite base method"""
350 self.__exit__(exc_type=None, exc_val=None, exc_tb=None)
352 @staticmethod
353 def _converter(type_of_var: str, value):
354 _mapper = {"Boolean": bool, "Real": float, "Integer": int, "Enumeration": int}
355 if type_of_var in _mapper:
356 return _mapper[type_of_var](value)
357 # else
358 return value