Coverage for ebcpy/simulationapi/fmu.py: 73%
177 statements
« prev ^ index » next coverage.py v7.4.4, created at 2025-08-26 09:12 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2025-08-26 09:12 +0000
1"""Module for classes using a fmu to
2simulate models."""
4import os
5import logging
6import atexit
7import shutil
8from pathlib import Path
9from typing import List, Union
11import fmpy
12from fmpy.model_description import read_model_description
13from pydantic import Field
14import pandas as pd
15import numpy as np
17from ebcpy import simulationapi, TimeSeriesData
18from ebcpy.simulationapi import SimulationSetup, SimulationSetupClass, Variable
19from ebcpy.utils.reproduction import CopyFile
21# pylint: disable=broad-except
24class FMU_Setup(SimulationSetup):
25 """
26 Add's custom setup parameters for simulating FMU's
27 to the basic `SimulationSetup`
28 """
30 timeout: float = Field(
31 title="timeout",
32 default=np.inf,
33 description="Timeout after which the simulation stops."
34 )
36 _default_solver = "CVode"
37 _allowed_solvers = ["CVode", "Euler"]
40class FMU_API(simulationapi.SimulationAPI):
41 """
42 Class for simulation using the fmpy library and
43 a functional mockup interface as a model input.
45 :param str,Path model_name:
46 Path to the .fmu model to be simulated.
47 :param str,Path working_directory:
48 Dirpath for the current working directory of simulation
49 results. If None (default), the path of the fmu is used.
50 :keyword bool log_fmu:
51 Whether to print fmu messages or not.
53 Example:
55 >>> import matplotlib.pyplot as plt
56 >>> from ebcpy import FMU_API
57 >>> # Select any valid fmu. Replace the line below if
58 >>> # you don't have this file on your device.
59 >>> model_name = "Path to your fmu"
60 >>> fmu_api = FMU_API(model_name)
61 >>> fmu_api.sim_setup = {"stop_time": 3600}
62 >>> result_df = fmu_api.simulate()
63 >>> fmu_api.close()
64 >>> # Select an exemplary column
65 >>> col = result_df.columns[0]
66 >>> plt.plot(result_df[col], label=col)
67 >>> _ = plt.legend()
68 >>> _ = plt.show()
70 .. versionadded:: 0.1.7
71 """
72 _items_to_drop = ["pool", "_fmu_instance", "_unzip_dir"]
73 _fmu_instance = None
74 _unzip_dir: str = None
75 _sim_setup_class: SimulationSetupClass = FMU_Setup
76 _type_map = {
77 float: np.double,
78 bool: np.bool_,
79 int: np.int_
80 }
82 def __init__(self, model_name: Union[str, Path], working_directory: Union[str, Path] = None, **kwargs):
83 """Instantiate class parameters"""
84 # Init instance attributes
85 self._model_description = None
86 self._fmi_type = None
87 self._unzip_dir = None
88 self._fmu_instance = None
89 self.log_fmu = kwargs.get("log_fmu", True)
90 self._single_unzip_dir: str = None
92 if isinstance(model_name, Path):
93 model_name = str(model_name)
94 if not model_name.lower().endswith(".fmu"):
95 raise ValueError(f"{model_name} is not a valid fmu file!")
96 if working_directory is None:
97 working_directory = os.path.dirname(model_name)
98 super().__init__(working_directory, model_name, **kwargs)
99 # Register exit option
100 atexit.register(self.close)
102 def _update_model(self):
103 # Setup the fmu instance
104 self.setup_fmu_instance()
106 def close(self):
107 """
108 Closes the fmu.
110 :return: bool
111 True on success
112 """
113 # Close MP of super class
114 super().close()
115 # Close if single process
116 if not self.use_mp:
117 if not self._fmu_instance:
118 return # Already closed
119 self._single_close(fmu_instance=self._fmu_instance,
120 unzip_dir=self._unzip_dir)
121 self._unzip_dir = None
122 self._fmu_instance = None
124 def _single_close(self, **kwargs):
125 fmu_instance = kwargs["fmu_instance"]
126 unzip_dir = kwargs["unzip_dir"]
127 try:
128 fmu_instance.terminate()
129 except Exception as error: # This is due to fmpy which does not yield a narrow error
130 self.logger.error(f"Could not terminate fmu instance: {error}")
131 try:
132 fmu_instance.freeInstance()
133 except OSError as error:
134 self.logger.error(f"Could not free fmu instance: {error}")
135 # Remove the extracted files
136 if unzip_dir is not None:
137 try:
138 shutil.rmtree(unzip_dir)
139 except FileNotFoundError:
140 pass # Nothing to delete
141 except PermissionError:
142 self.logger.error("Could not delete unzipped fmu "
143 "in location %s. Delete it yourself.", unzip_dir)
145 def _close_multiprocessing(self, _):
146 """Small helper function"""
147 idx_worker = self.worker_idx
148 if self._fmu_instance is None:
149 return # Already closed
150 self.logger.error(f"Closing fmu for worker {idx_worker}")
151 self._single_close(fmu_instance=self._fmu_instance,
152 unzip_dir=self._unzip_dir)
153 self._unzip_dir = None
154 self._fmu_instance = None
155 FMU_API._unzip_dir = None
156 FMU_API._fmu_instance = None
158 def simulate(self,
159 parameters: Union[dict, List[dict]] = None,
160 return_option: str = "time_series",
161 **kwargs):
162 """
163 Perform the single simulation for the given
164 unzip directory and fmu_instance.
165 See the docstring of simulate() for information on kwargs.
167 Additional kwargs:
169 :keyword str result_file_suffix:
170 Suffix of the result file. Supported options can be extracted
171 from the save() accessor function.
172 Default is 'csv'.
173 :keyword str parquet_engine:
174 The engine to use for the data format parquet.
175 Supported options can be extracted
176 from the save() accessor function.
177 Default is 'pyarrow'.
179 """
180 return super().simulate(parameters=parameters, return_option=return_option, **kwargs)
182 def _single_simulation(self, kwargs):
183 """
184 Perform the single simulation for the given
185 unzip directory and fmu_instance.
186 See the docstring of simulate() for information on kwargs.
188 The single argument kwarg is to make this
189 function accessible by multiprocessing pool.map.
190 """
191 # Unpack kwargs:
192 parameters = kwargs.pop("parameters", None)
193 return_option = kwargs.pop("return_option", "time_series")
194 inputs = kwargs.pop("inputs", None)
195 fail_on_error = kwargs.pop("fail_on_error", True)
196 result_file_name = kwargs.pop("result_file_name", "resultFile")
197 result_file_suffix = kwargs.pop("result_file_suffix", "csv")
198 parquet_engine = kwargs.pop('parquet_engine', 'pyarrow')
199 savepath = kwargs.pop("savepath", None)
200 if kwargs:
201 self.logger.error(
202 "You passed the following kwargs which "
203 "are not part of the supported kwargs and "
204 "have thus no effect: %s.", " ,".join(list(kwargs.keys())))
206 if self.use_mp:
207 if self._fmu_instance is None:
208 self._setup_single_fmu_instance(use_mp=True)
210 if inputs is not None:
211 if not isinstance(inputs, (TimeSeriesData, pd.DataFrame)):
212 raise TypeError("DataFrame or TimeSeriesData object expected for inputs.")
213 inputs = inputs.copy() # Create save copy
214 if isinstance(inputs, TimeSeriesData):
215 inputs = inputs.to_df(force_single_index=True)
216 if "time" in inputs.columns:
217 raise IndexError(
218 "Given inputs contain a column named 'time'. "
219 "The index is assumed to contain the time-information."
220 )
221 # Convert df to structured numpy array for fmpy: simulate_fmu
222 inputs.insert(0, column="time", value=inputs.index)
223 inputs_tuple = [tuple(columns) for columns in inputs.to_numpy()]
224 # Try to match the type, default is np.double.
225 # 'time' is not in inputs and thus handled separately.
226 dtype = [(inputs.columns[0], np.double)] + \
227 [(col,
228 self._type_map.get(self.inputs[col].type, np.double)
229 ) for col in inputs.columns[1:]]
230 inputs = np.array(inputs_tuple, dtype=dtype)
231 if parameters is None:
232 parameters = {}
233 else:
234 self.check_unsupported_variables(variables=list(parameters.keys()),
235 type_of_var="parameters")
236 try:
237 # reset the FMU instance instead of creating a new one
238 self._fmu_instance.reset()
239 # Simulate
240 res = fmpy.simulate_fmu(
241 filename=self._unzip_dir,
242 start_time=self.sim_setup.start_time,
243 stop_time=self.sim_setup.stop_time,
244 solver=self.sim_setup.solver,
245 step_size=self.sim_setup.fixedstepsize,
246 relative_tolerance=None,
247 output_interval=self.sim_setup.output_interval,
248 record_events=False, # Used for an equidistant output
249 start_values=parameters,
250 apply_default_start_values=False, # As we pass start_values already
251 input=inputs,
252 output=self.result_names,
253 timeout=self.sim_setup.timeout,
254 step_finished=None,
255 model_description=self._model_description,
256 fmu_instance=self._fmu_instance,
257 fmi_type=self._fmi_type,
258 )
260 except Exception as error:
261 self.logger.error(f"[SIMULATION ERROR] Error occurred while running FMU: \n {error}")
262 if fail_on_error:
263 raise error
264 return None
266 # Reshape result:
267 df = pd.DataFrame(res).set_index("time")
268 df.index = np.round(df.index.astype("float64"),
269 str(self.sim_setup.output_interval)[::-1].find('.'))
271 if return_option == "savepath":
272 if savepath is None:
273 savepath = self.working_directory
275 os.makedirs(savepath, exist_ok=True)
276 filepath = os.path.join(savepath, f"{result_file_name}.{result_file_suffix}")
277 df.tsd.save(
278 filepath=filepath,
279 key="simulation",
280 engine=parquet_engine
281 )
283 return filepath
284 if return_option == "last_point":
285 return df.iloc[-1].to_dict()
286 # Else return time series data
287 return df
289 def setup_fmu_instance(self):
290 """
291 Manually set up and extract the data to
292 avoid this step in the simulate function.
293 """
294 self.logger.info("Extracting fmu and reading fmu model description")
295 # First load model description and extract variables
296 self._single_unzip_dir = self.working_directory.joinpath(os.path.basename(self.model_name)[:-4] + "_extracted")
297 self._single_unzip_dir.mkdir(exist_ok=True)
298 self._single_unzip_dir = fmpy.extract(self.model_name,
299 unzipdir=self._single_unzip_dir)
300 self._model_description = read_model_description(self._single_unzip_dir,
301 validate=True)
303 if self._model_description.coSimulation is None:
304 self._fmi_type = 'ModelExchange'
305 else:
306 self._fmi_type = 'CoSimulation'
308 self.logger.info("Reading model variables")
310 _types = {
311 "Enumeration": int,
312 "Integer": int,
313 "Real": float,
314 "Boolean": bool,
315 "String": str
316 }
317 # Extract inputs, outputs & tuner (lists from parent classes will be appended)
318 for var in self._model_description.modelVariables:
319 if var.start is not None:
320 var.start = _types[var.type](var.start)
322 _var_ebcpy = Variable(
323 min=var.min,
324 max=var.max,
325 value=var.start,
326 type=_types[var.type]
327 )
328 if var.causality == 'input':
329 self.inputs[var.name] = _var_ebcpy
330 elif var.causality == 'output':
331 self.outputs[var.name] = _var_ebcpy
332 elif var.causality == 'parameter' or var.causality == 'calculatedParameter':
333 self.parameters[var.name] = _var_ebcpy
334 elif var.causality == 'local':
335 self.states[var.name] = _var_ebcpy
336 else:
337 self.logger.error(f"Could not map causality {var.causality}"
338 f" to any variable type.")
340 if self.use_mp:
341 self.logger.info("Extracting fmu %s times for "
342 "multiprocessing on %s processes",
343 self.n_cpu, self.n_cpu)
344 self.pool.map(
345 self._setup_single_fmu_instance,
346 [True for _ in range(self.n_cpu)]
347 )
348 self.logger.info("Instantiated fmu's on all processes.")
349 else:
350 self._setup_single_fmu_instance(use_mp=False)
352 def _setup_single_fmu_instance(self, use_mp):
353 if use_mp:
354 wrk_idx = self.worker_idx
355 if self._fmu_instance is not None:
356 return True
357 unzip_dir = self._single_unzip_dir.with_stem(self._single_unzip_dir.stem + f"_worker_{wrk_idx}")
358 fmpy.extract(self.model_name,
359 unzipdir=unzip_dir)
360 else:
361 wrk_idx = 0
362 unzip_dir = self._single_unzip_dir
364 self.logger.info("Instantiating fmu for worker %s", wrk_idx)
365 fmu_instance = fmpy.instantiate_fmu(
366 unzipdir=unzip_dir,
367 model_description=self._model_description,
368 fmi_type=self._fmi_type,
369 visible=False,
370 debug_logging=False,
371 logger=self._custom_logger,
372 fmi_call_logger=None)
373 if use_mp:
374 FMU_API._fmu_instance = fmu_instance
375 FMU_API._unzip_dir = unzip_dir
376 else:
377 self._fmu_instance = fmu_instance
378 self._unzip_dir = unzip_dir
379 return True
381 def _custom_logger(self, component, instanceName, status, category, message):
382 """ Print the FMU's log messages to the command line (works for both FMI 1.0 and 2.0) """
383 # pylint: disable=unused-argument, invalid-name
384 label = ['OK', 'WARNING', 'DISCARD', 'ERROR', 'FATAL', 'PENDING'][status]
385 _level_map = {'OK': logging.INFO,
386 'WARNING': logging.WARNING,
387 'DISCARD': logging.WARNING,
388 'ERROR': logging.ERROR,
389 'FATAL': logging.FATAL,
390 'PENDING': logging.FATAL}
391 if self.log_fmu:
392 self.logger.log(level=_level_map[label], msg=message.decode("utf-8"))
394 def save_for_reproduction(self,
395 title: str,
396 path: Path = None,
397 files: list = None,
398 **kwargs):
399 """
400 Additionally to the basic reproduction, add info
401 for FMU files.
402 """
403 if files is None:
404 files = []
405 files.append(CopyFile(
406 filename="FMU/" + Path(self.model_name).name,
407 sourcepath=Path(self.model_name),
408 remove=False
409 ))
410 return super().save_for_reproduction(
411 title=title,
412 path=path,
413 files=files,
414 **kwargs
415 )