Coverage for ebcpy/simulationapi/fmu.py: 73%
178 statements
« prev ^ index » next coverage.py v7.4.4, created at 2025-08-20 12:54 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2025-08-20 12:54 +0000
1"""Module for classes using a fmu to
2simulate models."""
4import os
5import logging
6import atexit
7import shutil
8from pathlib import Path
9from typing import List, Union
11import fmpy
12from fmpy.model_description import read_model_description
13from pydantic import Field
14import pandas as pd
15import numpy as np
17from ebcpy import simulationapi, TimeSeriesData
18from ebcpy.simulationapi import SimulationSetup, SimulationSetupClass, Variable
19from ebcpy.utils.reproduction import CopyFile
21# pylint: disable=broad-except
24class FMU_Setup(SimulationSetup):
25 """
26 Add's custom setup parameters for simulating FMU's
27 to the basic `SimulationSetup`
28 """
30 timeout: float = Field(
31 title="timeout",
32 default=np.inf,
33 description="Timeout after which the simulation stops."
34 )
36 _default_solver = "CVode"
37 _allowed_solvers = ["CVode", "Euler"]
40class FMU_API(simulationapi.SimulationAPI):
41 """
42 Class for simulation using the fmpy library and
43 a functional mockup interface as a model input.
45 :param str,Path model_name:
46 Path to the .fmu model to be simulated.
47 :param str,Path working_directory:
48 Dirpath for the current working directory of simulation
49 results. If None (default), the path of the fmu is used.
50 :keyword bool log_fmu:
51 Whether to print fmu messages or not.
53 Example:
55 >>> import matplotlib.pyplot as plt
56 >>> from ebcpy import FMU_API
57 >>> # Select any valid fmu. Replace the line below if
58 >>> # you don't have this file on your device.
59 >>> model_name = "Path to your fmu"
60 >>> fmu_api = FMU_API(model_name)
61 >>> fmu_api.sim_setup = {"stop_time": 3600}
62 >>> result_df = fmu_api.simulate()
63 >>> fmu_api.close()
64 >>> # Select an exemplary column
65 >>> col = result_df.columns[0]
66 >>> plt.plot(result_df[col], label=col)
67 >>> _ = plt.legend()
68 >>> _ = plt.show()
70 .. versionadded:: 0.1.7
71 """
72 _items_to_drop = ["pool", "_fmu_instance", "_unzip_dir"]
73 _fmu_instance = None
74 _unzip_dir: str = None
75 _sim_setup_class: SimulationSetupClass = FMU_Setup
76 _type_map = {
77 float: np.double,
78 bool: np.bool_,
79 int: np.int_
80 }
82 def __init__(self, model_name: Union[str, Path], working_directory: Union[str, Path] = None, **kwargs):
83 """Instantiate class parameters"""
84 # Init instance attributes
85 self._model_description = None
86 self._fmi_type = None
87 self._unzip_dir = None
88 self._fmu_instance = None
89 self.log_fmu = kwargs.get("log_fmu", True)
90 self._single_unzip_dir: str = None
92 if isinstance(model_name, Path):
93 model_name = str(model_name)
94 if not model_name.lower().endswith(".fmu"):
95 raise ValueError(f"{model_name} is not a valid fmu file!")
96 if working_directory is None:
97 working_directory = os.path.dirname(model_name)
98 super().__init__(working_directory, model_name, **kwargs)
99 # Register exit option
100 atexit.register(self.close)
102 def _update_model(self):
103 # Setup the fmu instance
104 self.setup_fmu_instance()
106 def close(self):
107 """
108 Closes the fmu.
110 :return: bool
111 True on success
112 """
113 # Close MP of super class
114 super().close()
115 # Close if single process
116 if not self.use_mp:
117 if not self._fmu_instance:
118 return # Already closed
119 self._single_close(fmu_instance=self._fmu_instance,
120 unzip_dir=self._unzip_dir)
121 self._unzip_dir = None
122 self._fmu_instance = None
124 def _single_close(self, **kwargs):
125 fmu_instance = kwargs["fmu_instance"]
126 unzip_dir = kwargs["unzip_dir"]
127 try:
128 fmu_instance.terminate()
129 except Exception as error: # This is due to fmpy which does not yield a narrow error
130 self.logger.error(f"Could not terminate fmu instance: {error}")
131 try:
132 fmu_instance.freeInstance()
133 except OSError as error:
134 self.logger.error(f"Could not free fmu instance: {error}")
135 # Remove the extracted files
136 if unzip_dir is not None:
137 try:
138 shutil.rmtree(unzip_dir)
139 except FileNotFoundError:
140 pass # Nothing to delete
141 except PermissionError:
142 self.logger.error("Could not delete unzipped fmu "
143 "in location %s. Delete it yourself.", unzip_dir)
145 def _close_multiprocessing(self, _):
146 """Small helper function"""
147 idx_worker = self.worker_idx
148 if self._fmu_instance is None:
149 return # Already closed
150 self.logger.error(f"Closing fmu for worker {idx_worker}")
151 self._single_close(fmu_instance=self._fmu_instance,
152 unzip_dir=self._unzip_dir)
153 self._unzip_dir = None
154 self._fmu_instance = None
155 FMU_API._unzip_dir = None
156 FMU_API._fmu_instance = None
158 def simulate(self,
159 parameters: Union[dict, List[dict]] = None,
160 return_option: str = "time_series",
161 **kwargs):
162 """
163 Perform the single simulation for the given
164 unzip directory and fmu_instance.
165 See the docstring of simulate() for information on kwargs.
167 Additional kwargs:
169 :keyword str result_file_suffix:
170 Suffix of the result file. Supported options can be extracted
171 from the TimeSeriesData.save() function.
172 Default is 'csv'.
173 :keyword str parquet_engine:
174 The engine to use for the data format parquet.
175 Supported options can be extracted
176 from the TimeSeriesData.save() function.
177 Default is 'pyarrow'.
179 """
180 return super().simulate(parameters=parameters, return_option=return_option, **kwargs)
182 def _single_simulation(self, kwargs):
183 """
184 Perform the single simulation for the given
185 unzip directory and fmu_instance.
186 See the docstring of simulate() for information on kwargs.
188 The single argument kwarg is to make this
189 function accessible by multiprocessing pool.map.
190 """
191 # Unpack kwargs:
192 parameters = kwargs.pop("parameters", None)
193 return_option = kwargs.pop("return_option", "time_series")
194 inputs = kwargs.pop("inputs", None)
195 fail_on_error = kwargs.pop("fail_on_error", True)
196 result_file_name = kwargs.pop("result_file_name", "resultFile")
197 result_file_suffix = kwargs.pop("result_file_suffix", "csv")
198 parquet_engine = kwargs.pop('parquet_engine', 'pyarrow')
199 savepath = kwargs.pop("savepath", None)
200 if kwargs:
201 self.logger.error(
202 "You passed the following kwargs which "
203 "are not part of the supported kwargs and "
204 "have thus no effect: %s.", " ,".join(list(kwargs.keys())))
206 if self.use_mp:
207 if self._fmu_instance is None:
208 self._setup_single_fmu_instance(use_mp=True)
210 if inputs is not None:
211 if not isinstance(inputs, (TimeSeriesData, pd.DataFrame)):
212 raise TypeError("DataFrame or TimeSeriesData object expected for inputs.")
213 inputs = inputs.copy() # Create save copy
214 if isinstance(inputs, TimeSeriesData):
215 inputs = inputs.to_df(force_single_index=True)
216 if "time" in inputs.columns:
217 raise IndexError(
218 "Given inputs contain a column named 'time'. "
219 "The index is assumed to contain the time-information."
220 )
221 # Convert df to structured numpy array for fmpy: simulate_fmu
222 inputs.insert(0, column="time", value=inputs.index)
223 inputs_tuple = [tuple(columns) for columns in inputs.to_numpy()]
224 # Try to match the type, default is np.double.
225 # 'time' is not in inputs and thus handled separately.
226 dtype = [(inputs.columns[0], np.double)] + \
227 [(col,
228 self._type_map.get(self.inputs[col].type, np.double)
229 ) for col in inputs.columns[1:]]
230 inputs = np.array(inputs_tuple, dtype=dtype)
231 if parameters is None:
232 parameters = {}
233 else:
234 self.check_unsupported_variables(variables=list(parameters.keys()),
235 type_of_var="parameters")
236 try:
237 # reset the FMU instance instead of creating a new one
238 self._fmu_instance.reset()
239 # Simulate
240 res = fmpy.simulate_fmu(
241 filename=self._unzip_dir,
242 start_time=self.sim_setup.start_time,
243 stop_time=self.sim_setup.stop_time,
244 solver=self.sim_setup.solver,
245 step_size=self.sim_setup.fixedstepsize,
246 relative_tolerance=None,
247 output_interval=self.sim_setup.output_interval,
248 record_events=False, # Used for an equidistant output
249 start_values=parameters,
250 apply_default_start_values=False, # As we pass start_values already
251 input=inputs,
252 output=self.result_names,
253 timeout=self.sim_setup.timeout,
254 step_finished=None,
255 model_description=self._model_description,
256 fmu_instance=self._fmu_instance,
257 fmi_type=self._fmi_type,
258 )
260 except Exception as error:
261 self.logger.error(f"[SIMULATION ERROR] Error occurred while running FMU: \n {error}")
262 if fail_on_error:
263 raise error
264 return None
266 # Reshape result:
267 df = pd.DataFrame(res).set_index("time")
268 df.index = np.round(df.index.astype("float64"),
269 str(self.sim_setup.output_interval)[::-1].find('.'))
271 if return_option == "savepath":
272 if savepath is None:
273 savepath = self.working_directory
275 os.makedirs(savepath, exist_ok=True)
276 filepath = os.path.join(savepath, f"{result_file_name}.{result_file_suffix}")
277 TimeSeriesData(df).droplevel(1, axis=1).save(
278 filepath=filepath,
279 key="simulation",
280 engine=parquet_engine
281 )
283 return filepath
284 if return_option == "last_point":
285 return df.iloc[-1].to_dict()
286 # Else return time series data
287 tsd = TimeSeriesData(df, default_tag="sim")
288 return tsd
290 def setup_fmu_instance(self):
291 """
292 Manually set up and extract the data to
293 avoid this step in the simulate function.
294 """
295 self.logger.info("Extracting fmu and reading fmu model description")
296 # First load model description and extract variables
297 self._single_unzip_dir = self.working_directory.joinpath(os.path.basename(self.model_name)[:-4] + "_extracted")
298 self._single_unzip_dir.mkdir(exist_ok=True)
299 self._single_unzip_dir = fmpy.extract(self.model_name,
300 unzipdir=self._single_unzip_dir)
301 self._model_description = read_model_description(self._single_unzip_dir,
302 validate=True)
304 if self._model_description.coSimulation is None:
305 self._fmi_type = 'ModelExchange'
306 else:
307 self._fmi_type = 'CoSimulation'
309 self.logger.info("Reading model variables")
311 _types = {
312 "Enumeration": int,
313 "Integer": int,
314 "Real": float,
315 "Boolean": bool,
316 "String": str
317 }
318 # Extract inputs, outputs & tuner (lists from parent classes will be appended)
319 for var in self._model_description.modelVariables:
320 if var.start is not None:
321 var.start = _types[var.type](var.start)
323 _var_ebcpy = Variable(
324 min=var.min,
325 max=var.max,
326 value=var.start,
327 type=_types[var.type]
328 )
329 if var.causality == 'input':
330 self.inputs[var.name] = _var_ebcpy
331 elif var.causality == 'output':
332 self.outputs[var.name] = _var_ebcpy
333 elif var.causality == 'parameter' or var.causality == 'calculatedParameter':
334 self.parameters[var.name] = _var_ebcpy
335 elif var.causality == 'local':
336 self.states[var.name] = _var_ebcpy
337 else:
338 self.logger.error(f"Could not map causality {var.causality}"
339 f" to any variable type.")
341 if self.use_mp:
342 self.logger.info("Extracting fmu %s times for "
343 "multiprocessing on %s processes",
344 self.n_cpu, self.n_cpu)
345 self.pool.map(
346 self._setup_single_fmu_instance,
347 [True for _ in range(self.n_cpu)]
348 )
349 self.logger.info("Instantiated fmu's on all processes.")
350 else:
351 self._setup_single_fmu_instance(use_mp=False)
353 def _setup_single_fmu_instance(self, use_mp):
354 if use_mp:
355 wrk_idx = self.worker_idx
356 if self._fmu_instance is not None:
357 return True
358 unzip_dir = self._single_unzip_dir.with_stem(self._single_unzip_dir + f"_worker_{wrk_idx}")
359 fmpy.extract(self.model_name,
360 unzipdir=unzip_dir)
361 else:
362 wrk_idx = 0
363 unzip_dir = self._single_unzip_dir
365 self.logger.info("Instantiating fmu for worker %s", wrk_idx)
366 fmu_instance = fmpy.instantiate_fmu(
367 unzipdir=unzip_dir,
368 model_description=self._model_description,
369 fmi_type=self._fmi_type,
370 visible=False,
371 debug_logging=False,
372 logger=self._custom_logger,
373 fmi_call_logger=None)
374 if use_mp:
375 FMU_API._fmu_instance = fmu_instance
376 FMU_API._unzip_dir = unzip_dir
377 else:
378 self._fmu_instance = fmu_instance
379 self._unzip_dir = unzip_dir
380 return True
382 def _custom_logger(self, component, instanceName, status, category, message):
383 """ Print the FMU's log messages to the command line (works for both FMI 1.0 and 2.0) """
384 # pylint: disable=unused-argument, invalid-name
385 label = ['OK', 'WARNING', 'DISCARD', 'ERROR', 'FATAL', 'PENDING'][status]
386 _level_map = {'OK': logging.INFO,
387 'WARNING': logging.WARNING,
388 'DISCARD': logging.WARNING,
389 'ERROR': logging.ERROR,
390 'FATAL': logging.FATAL,
391 'PENDING': logging.FATAL}
392 if self.log_fmu:
393 self.logger.log(level=_level_map[label], msg=message.decode("utf-8"))
395 def save_for_reproduction(self,
396 title: str,
397 path: Path = None,
398 files: list = None,
399 **kwargs):
400 """
401 Additionally to the basic reproduction, add info
402 for FMU files.
403 """
404 if files is None:
405 files = []
406 files.append(CopyFile(
407 filename="FMU/" + Path(self.model_name).name,
408 sourcepath=Path(self.model_name),
409 remove=False
410 ))
411 return super().save_for_reproduction(
412 title=title,
413 path=path,
414 files=files,
415 **kwargs
416 )