Coverage for agentlib/models/fmu_model.py: 33%
165 statements
« prev ^ index » next coverage.py v7.4.4, created at 2025-12-23 08:15 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2025-12-23 08:15 +0000
1"""This module contains the FMUModel class."""
3import queue
4import shutil
5import os
6import logging
7import pathlib
8import uuid
9from itertools import chain
10from typing import Union, List
12import attrs
13import pydantic
14from pydantic import field_validator, FilePath
16from agentlib.utils import create_time_samples
17from agentlib.core import Model, ModelConfig
18from agentlib.core.errors import OptionalDependencyError
19from agentlib.core.datamodels import ModelVariable, Causality
21try:
22 import fmpy.fmi2
23 import fmpy
24 from fmpy.fmi1 import FMICallException
25except ImportError as err:
26 raise OptionalDependencyError(
27 dependency_name="fmu", dependency_install="fmpy", used_object="FMU-model"
28 ) from err
30logger = logging.getLogger(__name__)
33class FmuModelConfig(ModelConfig):
34 """
35 The Config of FMUModels overwrite the default
36 ModelConfig to redefine the system and add relevant
37 fields like path and tolerance of the simulation.
38 """
40 path: FilePath
41 tolerance: float = 0.001
42 extract_fmu: bool = False
43 log_fmu: bool = True
44 only_config_variables: bool = pydantic.Field(
45 default=True,
46 description="If True, only the variables passed to this model by a simulator "
47 "will be read and written at each simulation step (specified by "
48 "dt).",
49 )
51 @field_validator("path")
52 @classmethod
53 def check_path(cls, path):
54 """Check if the path has the correct extension"""
55 # check file extension
56 assert path.suffix in [".fmu", ".mo"], "Unknown file-extension"
57 return path
60class FmuModel(Model):
61 """Class to wrap any FMU Model into the Model-Standard
62 of the agentlib.
63 """
65 config: FmuModelConfig
67 def __init__(self, **kwargs):
68 # Private map to link variable names (str) to the fmu value reference (int)
69 self._variables_vr = {}
70 self._unzip_dir = None
71 # Initialize config
72 super().__init__(**kwargs)
73 # Actively set the path to trigger the automatic setup (evokes __init_fmu)
74 self.system = self.__init_fmu()
75 initial_write_values = self.inputs + self.parameters
76 self._variables_to_write = queue.Queue()
77 [self._variables_to_write.put(v) for v in initial_write_values]
79 @property
80 def tolerance(self):
81 """Get the tolerance of FMU simulation"""
82 return self.config.tolerance
84 @tolerance.setter
85 def tolerance(self, tolerance: float):
86 """Set the tolerance in the config."""
87 self.config.tolerance = tolerance
89 @tolerance.deleter
90 def tolerance(self):
91 """Delete the tolerance and restore the default value."""
92 self.config.tolerance = self.get_config_type().tolerance
94 @property
95 def extract_fmu(self):
96 """Get whether the fmu shall be extracted to a new
97 directory or if the temp folder is used."""
98 return self.config.extract_fmu
100 def do_step(self, *, t_start, t_sample=None):
101 if t_sample is None:
102 t_sample = self.dt
103 # Write current values to system
104 while not self._variables_to_write.empty():
105 self.__write_value(self._variables_to_write.get_nowait())
106 t_samples = create_time_samples(t_end=t_sample, dt=self.dt) + t_start
107 try:
108 for _idx, _t_sample in enumerate(t_samples[:-1]):
109 # do step
110 self.system.doStep(
111 currentCommunicationPoint=_t_sample,
112 communicationStepSize=t_samples[_idx + 1] - _t_sample,
113 )
114 except FMICallException as e:
115 # Raise a different error, as simpy does not work well with FMI Errors
116 raise RuntimeError(
117 "The fmu had an internal error. Please check the logs to analyze it."
118 ) from e
119 # Read current values from system
120 self.__read_values()
121 return True
123 def initialize(self, **kwargs):
124 """
125 Initializes FMU model
127 Required kwargs:
128 t_start (float): Start time of simulation
129 t_stop (float): Stop time of simulation
130 """
131 logger.info("Initializing model...")
132 # Handle Logging of the FMU itself:
133 try:
134 callbacks = fmpy.fmi2.fmi2CallbackFunctions()
135 callbacks.logger = fmpy.fmi2.fmi2CallbackLoggerTYPE(self._fmu_logger)
136 callbacks.allocateMemory = fmpy.fmi2.fmi2CallbackAllocateMemoryTYPE(
137 fmpy.calloc
138 )
139 callbacks.freeMemory = fmpy.fmi2.fmi2CallbackFreeMemoryTYPE(fmpy.free)
140 from fmpy.logging import addLoggerProxy
141 from ctypes import byref
143 addLoggerProxy(byref(callbacks))
144 except Exception as err:
145 logger.error("Could not setup custom logger in FMU model: %s", err)
146 callbacks = None
148 self.system.instantiate(
149 callbacks=callbacks,
150 loggingOn=False, # Only for debug of fmu itself
151 visible=False, # Only for debug of fmu itself
152 )
153 self.system.reset()
154 self.system.setupExperiment(
155 startTime=kwargs["t_start"],
156 stopTime=kwargs["t_stop"],
157 tolerance=self.tolerance,
158 )
159 self.system.enterInitializationMode()
160 self.system.exitInitializationMode()
161 logger.info("Model: %s initialized", self.name)
163 def _fmu_logger(self, component, instanceName, status, category, message):
164 """Print the FMU's log messages to the command line (works for both FMI 1.0 and 2.0)"""
165 # pylint: disable=unused-argument, invalid-name
166 if self.config.log_fmu:
167 label = ["OK", "WARNING", "DISCARD", "ERROR", "FATAL", "PENDING"][status]
168 _level_map = {
169 "OK": logging.INFO,
170 "WARNING": logging.WARNING,
171 "DISCARD": logging.WARNING,
172 "ERROR": logging.ERROR,
173 "FATAL": logging.FATAL,
174 "PENDING": logging.FATAL,
175 }
176 logger.log(level=_level_map[label], msg=message.decode("utf-8"))
178 def __init_fmu(self) -> fmpy.fmi2.FMU2Slave:
179 path = self.config.path
180 # System setup:
181 # extract the FMU
182 if self.extract_fmu:
183 # Create own unzip directory
184 _path = pathlib.Path(path)
185 if not _path.is_absolute():
186 _path = pathlib.Path(os.getcwd()).joinpath(_path)
187 _unzip_dir = _path.parents[0].joinpath(
188 f'{_path.name.replace(".fmu", "")}' f"_extracted_{uuid.uuid4()}"
189 )
190 _unzip_dir = str(_unzip_dir)
191 else:
192 _unzip_dir = None
193 cur_cwd = os.getcwd()
194 self._unzip_dir = fmpy.extract(filename=path, unzipdir=_unzip_dir)
195 os.chdir(cur_cwd) # Reset cwd. fmpy changes it sometimes.
196 # Read the model description
197 _model_description = fmpy.read_model_description(self._unzip_dir, validate=True)
198 _system = fmpy.fmi2.FMU2Slave(
199 guid=_model_description.guid,
200 unzipDirectory=self._unzip_dir,
201 modelIdentifier=_model_description.coSimulation.modelIdentifier,
202 instanceName=__name__,
203 fmiCallLogger=None,
204 )
205 self.name = _model_description.modelName
206 if _model_description.description is not None:
207 self.description = _model_description.description
209 # Variable setup:
210 # Get the inputs, outputs, internals and parameters
211 self._variables_vr = {}
212 _vars = {
213 Causality.input: [],
214 Causality.parameter: [],
215 Causality.calculatedParameter: [],
216 Causality.output: [],
217 Causality.local: [],
218 Causality.independent: [],
219 }
220 config_vars = set(self.config.get_variable_names())
221 for _model_var in _model_description.modelVariables:
222 # Convert to an agentlib ModelVariable object
223 if _model_var.type == "String":
224 logger.warning(
225 "String variable %s omitted. Not supported in AgentLib.",
226 _model_var.name,
227 )
228 continue # Don't allow string model variables
230 # if desired, we skip adding variables to this model instance if they are
231 # not specified from outside. They will remain within the
232 # fmpy.fmi2.FMU2Slave instance
233 if self.config.only_config_variables and _model_var.name not in config_vars:
234 continue
236 _vars[_model_var.causality].append(
237 dict(
238 name=_model_var.name,
239 type=_model_var.type,
240 value=(
241 self._converter(_model_var.type, _model_var.start)
242 if (
243 _model_var.causality
244 in [
245 Causality.parameter,
246 Causality.calculatedParameter,
247 Causality.input,
248 ]
249 and _model_var.start is not None
250 )
251 else None
252 ),
253 unit=(
254 _model_var.unit
255 if _model_var.unit is not None
256 else attrs.fields(ModelVariable).unit.default
257 ),
258 description=(
259 _model_var.description
260 if _model_var.description is not None
261 else attrs.fields(ModelVariable).description.default
262 ),
263 causality=_model_var.causality,
264 variability=_model_var.variability,
265 )
266 )
267 self._variables_vr.update({_model_var.name: _model_var.valueReference})
268 # This sets the inputs, outputs, internals and parameters and variables
269 self.config = {
270 "inputs": _vars[Causality.input],
271 "outputs": _vars[Causality.output],
272 "parameters": _vars[Causality.parameter]
273 + _vars[Causality.calculatedParameter],
274 "states": _vars[Causality.local] + _vars[Causality.independent],
275 **self.config.model_dump(
276 exclude={"inputs", "outputs", "states", "parameters"}
277 ),
278 }
279 return _system
281 def __write_value(self, var: ModelVariable):
282 # One can only set inputs and parameters!
284 if var.value is None:
285 logger.error(
286 "Tried setting the value of %s to None. This will not be set in the "
287 "FMU.",
288 var.name,
289 )
290 return
292 _vr = self._variables_vr[var.name]
293 if var.type == "Real":
294 self.system.setReal([_vr], [float(var.value)])
295 elif var.type in ["Integer", "Enumeration"]:
296 self.system.setInteger([_vr], [int(var.value)])
297 elif var.type == "Boolean":
298 self.system.setBoolean([_vr], [bool(var.value)])
299 else:
300 logger.error("Variable %s not valid for this model!", var.name)
302 def set_input_values(self, names: List[str], values: List[Union[float, int, bool]]):
303 """Sets input values in the model and in the FMU."""
304 super().set_input_values(names=names, values=values)
305 for name in names:
306 var = self._inputs[name]
307 self._variables_to_write.put(var)
309 def set_parameter_values(
310 self, names: List[str], values: List[Union[float, int, bool]]
311 ):
312 """Sets parameter values in the model and in the FMU."""
313 super().set_parameter_values(names=names, values=values)
314 for name in names:
315 var = self._parameters[name]
316 self._variables_to_write.put(var)
318 def __read_values(self):
319 for _var in chain.from_iterable([self.outputs, self.parameters, self.states]):
320 _vr = self._variables_vr[_var.name]
321 if _var.type == "Real":
322 _var.value = self.system.getReal([_vr])[0]
323 elif _var.type in ["Integer", "Enumeration"]:
324 _var.value = self.system.getInteger([_vr])[0]
325 elif _var.type == "Boolean":
326 _var.value = self.system.getBoolean([_vr])[0]
327 else:
328 raise TypeError(
329 f"Unsupported type: {_var.type} for variable {_var.name}"
330 )
332 def __enter__(self):
333 self._terminate_and_free_instance()
335 def __exit__(self, exc_type, exc_val, exc_tb):
336 self._terminate_and_free_instance()
337 # clean up
338 shutil.rmtree(self._unzip_dir, ignore_errors=True)
340 def _terminate_and_free_instance(self):
341 try:
342 self.system.terminate()
343 except Exception as err:
344 logger.error("Could not terminate FMU instance %s", err)
345 try:
346 self.system.freeInstance()
347 except Exception as err:
348 logger.error("Could not terminate FMU instance %s", err)
350 def terminate(self):
351 """Overwrite base method"""
352 self.__exit__(exc_type=None, exc_val=None, exc_tb=None)
354 @staticmethod
355 def _converter(type_of_var: str, value):
356 _mapper = {"Boolean": bool, "Real": float, "Integer": int, "Enumeration": int}
357 if type_of_var in _mapper:
358 return _mapper[type_of_var](value)
359 # else
360 return value