Coverage for ebcpy/simulationapi/fmu.py: 72%
178 statements
« prev ^ index » next coverage.py v7.4.4, created at 2024-09-19 12:21 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2024-09-19 12:21 +0000
1"""Module for classes using a fmu to
2simulate models."""
4import os
5import logging
6import pathlib
7import atexit
8import shutil
9from typing import List, Union
11import fmpy
12from fmpy.model_description import read_model_description
13from pydantic import Field
14import pandas as pd
15import numpy as np
17from ebcpy import simulationapi, TimeSeriesData
18from ebcpy.simulationapi import SimulationSetup, SimulationSetupClass, Variable
19from ebcpy.utils.reproduction import CopyFile
21# pylint: disable=broad-except
24class FMU_Setup(SimulationSetup):
25 """
26 Add's custom setup parameters for simulating FMU's
27 to the basic `SimulationSetup`
28 """
30 timeout: float = Field(
31 title="timeout",
32 default=np.inf,
33 description="Timeout after which the simulation stops."
34 )
36 _default_solver = "CVode"
37 _allowed_solvers = ["CVode", "Euler"]
40class FMU_API(simulationapi.SimulationAPI):
41 """
42 Class for simulation using the fmpy library and
43 a functional mockup interface as a model input.
45 :keyword bool log_fmu:
46 Whether to print fmu messages or not.
48 Example:
50 >>> import matplotlib.pyplot as plt
51 >>> from ebcpy import FMU_API
52 >>> # Select any valid fmu. Replace the line below if
53 >>> # you don't have this file on your device.
54 >>> model_name = "Path to your fmu"
55 >>> fmu_api = FMU_API(model_name)
56 >>> fmu_api.sim_setup = {"stop_time": 3600}
57 >>> result_df = fmu_api.simulate()
58 >>> fmu_api.close()
59 >>> # Select an exemplary column
60 >>> col = result_df.columns[0]
61 >>> plt.plot(result_df[col], label=col)
62 >>> _ = plt.legend()
63 >>> _ = plt.show()
65 .. versionadded:: 0.1.7
66 """
67 _items_to_drop = ["pool", "_fmu_instance", "_unzip_dir"]
68 _fmu_instance = None
69 _unzip_dir: str = None
70 _sim_setup_class: SimulationSetupClass = FMU_Setup
71 _type_map = {
72 float: np.double,
73 bool: np.bool_,
74 int: np.int_
75 }
77 def __init__(self, working_directory, model_name, **kwargs):
78 """Instantiate class parameters"""
79 # Init instance attributes
80 self._model_description = None
81 self._fmi_type = None
82 self._unzip_dir = None
83 self._fmu_instance = None
84 self.log_fmu = kwargs.get("log_fmu", True)
85 self._single_unzip_dir: str = None
87 if isinstance(model_name, pathlib.Path):
88 model_name = str(model_name)
89 if not model_name.lower().endswith(".fmu"):
90 raise ValueError(f"{model_name} is not a valid fmu file!")
91 if working_directory is None:
92 working_directory = os.path.dirname(model_name)
93 super().__init__(working_directory, model_name, **kwargs)
94 # Register exit option
95 atexit.register(self.close)
97 def _update_model(self):
98 # Setup the fmu instance
99 self.setup_fmu_instance()
101 def close(self):
102 """
103 Closes the fmu.
105 :return: bool
106 True on success
107 """
108 # Close MP of super class
109 super().close()
110 # Close if single process
111 if not self.use_mp:
112 if not self._fmu_instance:
113 return # Already closed
114 self._single_close(fmu_instance=self._fmu_instance,
115 unzip_dir=self._unzip_dir)
116 self._unzip_dir = None
117 self._fmu_instance = None
119 def _single_close(self, **kwargs):
120 fmu_instance = kwargs["fmu_instance"]
121 unzip_dir = kwargs["unzip_dir"]
122 try:
123 fmu_instance.terminate()
124 except Exception as error: # This is due to fmpy which does not yield a narrow error
125 self.logger.error(f"Could not terminate fmu instance: {error}")
126 try:
127 fmu_instance.freeInstance()
128 except OSError as error:
129 self.logger.error(f"Could not free fmu instance: {error}")
130 # Remove the extracted files
131 if unzip_dir is not None:
132 try:
133 shutil.rmtree(unzip_dir)
134 except FileNotFoundError:
135 pass # Nothing to delete
136 except PermissionError:
137 self.logger.error("Could not delete unzipped fmu "
138 "in location %s. Delete it yourself.", unzip_dir)
140 def _close_multiprocessing(self, _):
141 """Small helper function"""
142 idx_worker = self.worker_idx
143 if self._fmu_instance is None:
144 return # Already closed
145 self.logger.error(f"Closing fmu for worker {idx_worker}")
146 self._single_close(fmu_instance=self._fmu_instance,
147 unzip_dir=self._unzip_dir)
148 self._unzip_dir = None
149 self._fmu_instance = None
150 FMU_API._unzip_dir = None
151 FMU_API._fmu_instance = None
153 def simulate(self,
154 parameters: Union[dict, List[dict]] = None,
155 return_option: str = "time_series",
156 **kwargs):
157 """
158 Perform the single simulation for the given
159 unzip directory and fmu_instance.
160 See the docstring of simulate() for information on kwargs.
162 Additional kwargs:
164 :keyword str result_file_suffix:
165 Suffix of the result file. Supported options can be extracted
166 from the TimeSeriesData.save() function.
167 Default is 'csv'.
168 :keyword str parquet_engine:
169 The engine to use for the data format parquet.
170 Supported options can be extracted
171 from the TimeSeriesData.save() function.
172 Default is 'pyarrow'.
174 """
175 return super().simulate(parameters=parameters, return_option=return_option, **kwargs)
177 def _single_simulation(self, kwargs):
178 """
179 Perform the single simulation for the given
180 unzip directory and fmu_instance.
181 See the docstring of simulate() for information on kwargs.
183 The single argument kwarg is to make this
184 function accessible by multiprocessing pool.map.
185 """
186 # Unpack kwargs:
187 parameters = kwargs.pop("parameters", None)
188 return_option = kwargs.pop("return_option", "time_series")
189 inputs = kwargs.pop("inputs", None)
190 fail_on_error = kwargs.pop("fail_on_error", True)
191 result_file_name = kwargs.pop("result_file_name", "resultFile")
192 result_file_suffix = kwargs.pop("result_file_suffix", "csv")
193 parquet_engine = kwargs.pop('parquet_engine', 'pyarrow')
194 savepath = kwargs.pop("savepath", None)
195 if kwargs:
196 self.logger.error(
197 "You passed the following kwargs which "
198 "are not part of the supported kwargs and "
199 "have thus no effect: %s.", " ,".join(list(kwargs.keys())))
201 if self.use_mp:
202 if self._fmu_instance is None:
203 self._setup_single_fmu_instance(use_mp=True)
205 if inputs is not None:
206 if not isinstance(inputs, (TimeSeriesData, pd.DataFrame)):
207 raise TypeError("DataFrame or TimeSeriesData object expected for inputs.")
208 inputs = inputs.copy() # Create save copy
209 if isinstance(inputs, TimeSeriesData):
210 inputs = inputs.to_df(force_single_index=True)
211 if "time" in inputs.columns:
212 raise IndexError(
213 "Given inputs contain a column named 'time'. "
214 "The index is assumed to contain the time-information."
215 )
216 # Convert df to structured numpy array for fmpy: simulate_fmu
217 inputs.insert(0, column="time", value=inputs.index)
218 inputs_tuple = [tuple(columns) for columns in inputs.to_numpy()]
219 # Try to match the type, default is np.double.
220 # 'time' is not in inputs and thus handled separately.
221 dtype = [(inputs.columns[0], np.double)] + \
222 [(col,
223 self._type_map.get(self.inputs[col].type, np.double)
224 ) for col in inputs.columns[1:]]
225 inputs = np.array(inputs_tuple, dtype=dtype)
226 if parameters is None:
227 parameters = {}
228 else:
229 self.check_unsupported_variables(variables=list(parameters.keys()),
230 type_of_var="parameters")
231 try:
232 # reset the FMU instance instead of creating a new one
233 self._fmu_instance.reset()
234 # Simulate
235 res = fmpy.simulate_fmu(
236 filename=self._unzip_dir,
237 start_time=self.sim_setup.start_time,
238 stop_time=self.sim_setup.stop_time,
239 solver=self.sim_setup.solver,
240 step_size=self.sim_setup.fixedstepsize,
241 relative_tolerance=None,
242 output_interval=self.sim_setup.output_interval,
243 record_events=False, # Used for an equidistant output
244 start_values=parameters,
245 apply_default_start_values=False, # As we pass start_values already
246 input=inputs,
247 output=self.result_names,
248 timeout=self.sim_setup.timeout,
249 step_finished=None,
250 model_description=self._model_description,
251 fmu_instance=self._fmu_instance,
252 fmi_type=self._fmi_type,
253 )
255 except Exception as error:
256 self.logger.error(f"[SIMULATION ERROR] Error occurred while running FMU: \n {error}")
257 if fail_on_error:
258 raise error
259 return None
261 # Reshape result:
262 df = pd.DataFrame(res).set_index("time")
263 df.index = np.round(df.index.astype("float64"),
264 str(self.sim_setup.output_interval)[::-1].find('.'))
266 if return_option == "savepath":
267 if savepath is None:
268 savepath = self.working_directory
270 os.makedirs(savepath, exist_ok=True)
271 filepath = os.path.join(savepath, f"{result_file_name}.{result_file_suffix}")
272 TimeSeriesData(df).droplevel(1, axis=1).save(
273 filepath=filepath,
274 key="simulation",
275 engine=parquet_engine
276 )
278 return filepath
279 if return_option == "last_point":
280 return df.iloc[-1].to_dict()
281 # Else return time series data
282 tsd = TimeSeriesData(df, default_tag="sim")
283 return tsd
285 def setup_fmu_instance(self):
286 """
287 Manually set up and extract the data to
288 avoid this step in the simulate function.
289 """
290 self.logger.info("Extracting fmu and reading fmu model description")
291 # First load model description and extract variables
292 self._single_unzip_dir = os.path.join(self.working_directory,
293 os.path.basename(self.model_name)[:-4] + "_extracted")
294 os.makedirs(self._single_unzip_dir, exist_ok=True)
295 self._single_unzip_dir = fmpy.extract(self.model_name,
296 unzipdir=self._single_unzip_dir)
297 self._model_description = read_model_description(self._single_unzip_dir,
298 validate=True)
300 if self._model_description.coSimulation is None:
301 self._fmi_type = 'ModelExchange'
302 else:
303 self._fmi_type = 'CoSimulation'
305 self.logger.info("Reading model variables")
307 _types = {
308 "Enumeration": int,
309 "Integer": int,
310 "Real": float,
311 "Boolean": bool,
312 "String": str
313 }
314 # Extract inputs, outputs & tuner (lists from parent classes will be appended)
315 for var in self._model_description.modelVariables:
316 if var.start is not None:
317 var.start = _types[var.type](var.start)
319 _var_ebcpy = Variable(
320 min=var.min,
321 max=var.max,
322 value=var.start,
323 type=_types[var.type]
324 )
325 if var.causality == 'input':
326 self.inputs[var.name] = _var_ebcpy
327 elif var.causality == 'output':
328 self.outputs[var.name] = _var_ebcpy
329 elif var.causality == 'parameter' or var.causality == 'calculatedParameter':
330 self.parameters[var.name] = _var_ebcpy
331 elif var.causality == 'local':
332 self.states[var.name] = _var_ebcpy
333 else:
334 self.logger.error(f"Could not map causality {var.causality}"
335 f" to any variable type.")
337 if self.use_mp:
338 self.logger.info("Extracting fmu %s times for "
339 "multiprocessing on %s processes",
340 self.n_cpu, self.n_cpu)
341 self.pool.map(
342 self._setup_single_fmu_instance,
343 [True for _ in range(self.n_cpu)]
344 )
345 self.logger.info("Instantiated fmu's on all processes.")
346 else:
347 self._setup_single_fmu_instance(use_mp=False)
349 def _setup_single_fmu_instance(self, use_mp):
350 if use_mp:
351 wrk_idx = self.worker_idx
352 if self._fmu_instance is not None:
353 return True
354 unzip_dir = self._single_unzip_dir + f"_worker_{wrk_idx}"
355 fmpy.extract(self.model_name,
356 unzipdir=unzip_dir)
357 else:
358 wrk_idx = 0
359 unzip_dir = self._single_unzip_dir
361 self.logger.info("Instantiating fmu for worker %s", wrk_idx)
362 fmu_instance = fmpy.instantiate_fmu(
363 unzipdir=unzip_dir,
364 model_description=self._model_description,
365 fmi_type=self._fmi_type,
366 visible=False,
367 debug_logging=False,
368 logger=self._custom_logger,
369 fmi_call_logger=None)
370 if use_mp:
371 FMU_API._fmu_instance = fmu_instance
372 FMU_API._unzip_dir = unzip_dir
373 else:
374 self._fmu_instance = fmu_instance
375 self._unzip_dir = unzip_dir
376 return True
378 def _custom_logger(self, component, instanceName, status, category, message):
379 """ Print the FMU's log messages to the command line (works for both FMI 1.0 and 2.0) """
380 # pylint: disable=unused-argument, invalid-name
381 label = ['OK', 'WARNING', 'DISCARD', 'ERROR', 'FATAL', 'PENDING'][status]
382 _level_map = {'OK': logging.INFO,
383 'WARNING': logging.WARNING,
384 'DISCARD': logging.WARNING,
385 'ERROR': logging.ERROR,
386 'FATAL': logging.FATAL,
387 'PENDING': logging.FATAL}
388 if self.log_fmu:
389 self.logger.log(level=_level_map[label], msg=message.decode("utf-8"))
391 def save_for_reproduction(self,
392 title: str,
393 path: pathlib.Path = None,
394 files: list = None,
395 **kwargs):
396 """
397 Additionally to the basic reproduction, add info
398 for FMU files.
399 """
400 if files is None:
401 files = []
402 files.append(CopyFile(
403 filename="FMU/" + pathlib.Path(self.model_name).name,
404 sourcepath=pathlib.Path(self.model_name),
405 remove=False
406 ))
407 return super().save_for_reproduction(
408 title=title,
409 path=path,
410 files=files,
411 **kwargs
412 )