Coverage for ebcpy/simulationapi/fmu.py: 73%

178 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2025-08-20 12:54 +0000

1"""Module for classes using a fmu to 

2simulate models.""" 

3 

4import os 

5import logging 

6import atexit 

7import shutil 

8from pathlib import Path 

9from typing import List, Union 

10 

11import fmpy 

12from fmpy.model_description import read_model_description 

13from pydantic import Field 

14import pandas as pd 

15import numpy as np 

16 

17from ebcpy import simulationapi, TimeSeriesData 

18from ebcpy.simulationapi import SimulationSetup, SimulationSetupClass, Variable 

19from ebcpy.utils.reproduction import CopyFile 

20 

21# pylint: disable=broad-except 

22 

23 

24class FMU_Setup(SimulationSetup): 

25 """ 

26 Add's custom setup parameters for simulating FMU's 

27 to the basic `SimulationSetup` 

28 """ 

29 

30 timeout: float = Field( 

31 title="timeout", 

32 default=np.inf, 

33 description="Timeout after which the simulation stops." 

34 ) 

35 

36 _default_solver = "CVode" 

37 _allowed_solvers = ["CVode", "Euler"] 

38 

39 

40class FMU_API(simulationapi.SimulationAPI): 

41 """ 

42 Class for simulation using the fmpy library and 

43 a functional mockup interface as a model input. 

44 

45 :param str,Path model_name: 

46 Path to the .fmu model to be simulated. 

47 :param str,Path working_directory: 

48 Dirpath for the current working directory of simulation 

49 results. If None (default), the path of the fmu is used. 

50 :keyword bool log_fmu: 

51 Whether to print fmu messages or not. 

52 

53 Example: 

54 

55 >>> import matplotlib.pyplot as plt 

56 >>> from ebcpy import FMU_API 

57 >>> # Select any valid fmu. Replace the line below if 

58 >>> # you don't have this file on your device. 

59 >>> model_name = "Path to your fmu" 

60 >>> fmu_api = FMU_API(model_name) 

61 >>> fmu_api.sim_setup = {"stop_time": 3600} 

62 >>> result_df = fmu_api.simulate() 

63 >>> fmu_api.close() 

64 >>> # Select an exemplary column 

65 >>> col = result_df.columns[0] 

66 >>> plt.plot(result_df[col], label=col) 

67 >>> _ = plt.legend() 

68 >>> _ = plt.show() 

69 

70 .. versionadded:: 0.1.7 

71 """ 

72 _items_to_drop = ["pool", "_fmu_instance", "_unzip_dir"] 

73 _fmu_instance = None 

74 _unzip_dir: str = None 

75 _sim_setup_class: SimulationSetupClass = FMU_Setup 

76 _type_map = { 

77 float: np.double, 

78 bool: np.bool_, 

79 int: np.int_ 

80 } 

81 

82 def __init__(self, model_name: Union[str, Path], working_directory: Union[str, Path] = None, **kwargs): 

83 """Instantiate class parameters""" 

84 # Init instance attributes 

85 self._model_description = None 

86 self._fmi_type = None 

87 self._unzip_dir = None 

88 self._fmu_instance = None 

89 self.log_fmu = kwargs.get("log_fmu", True) 

90 self._single_unzip_dir: str = None 

91 

92 if isinstance(model_name, Path): 

93 model_name = str(model_name) 

94 if not model_name.lower().endswith(".fmu"): 

95 raise ValueError(f"{model_name} is not a valid fmu file!") 

96 if working_directory is None: 

97 working_directory = os.path.dirname(model_name) 

98 super().__init__(working_directory, model_name, **kwargs) 

99 # Register exit option 

100 atexit.register(self.close) 

101 

102 def _update_model(self): 

103 # Setup the fmu instance 

104 self.setup_fmu_instance() 

105 

106 def close(self): 

107 """ 

108 Closes the fmu. 

109 

110 :return: bool 

111 True on success 

112 """ 

113 # Close MP of super class 

114 super().close() 

115 # Close if single process 

116 if not self.use_mp: 

117 if not self._fmu_instance: 

118 return # Already closed 

119 self._single_close(fmu_instance=self._fmu_instance, 

120 unzip_dir=self._unzip_dir) 

121 self._unzip_dir = None 

122 self._fmu_instance = None 

123 

124 def _single_close(self, **kwargs): 

125 fmu_instance = kwargs["fmu_instance"] 

126 unzip_dir = kwargs["unzip_dir"] 

127 try: 

128 fmu_instance.terminate() 

129 except Exception as error: # This is due to fmpy which does not yield a narrow error 

130 self.logger.error(f"Could not terminate fmu instance: {error}") 

131 try: 

132 fmu_instance.freeInstance() 

133 except OSError as error: 

134 self.logger.error(f"Could not free fmu instance: {error}") 

135 # Remove the extracted files 

136 if unzip_dir is not None: 

137 try: 

138 shutil.rmtree(unzip_dir) 

139 except FileNotFoundError: 

140 pass # Nothing to delete 

141 except PermissionError: 

142 self.logger.error("Could not delete unzipped fmu " 

143 "in location %s. Delete it yourself.", unzip_dir) 

144 

145 def _close_multiprocessing(self, _): 

146 """Small helper function""" 

147 idx_worker = self.worker_idx 

148 if self._fmu_instance is None: 

149 return # Already closed 

150 self.logger.error(f"Closing fmu for worker {idx_worker}") 

151 self._single_close(fmu_instance=self._fmu_instance, 

152 unzip_dir=self._unzip_dir) 

153 self._unzip_dir = None 

154 self._fmu_instance = None 

155 FMU_API._unzip_dir = None 

156 FMU_API._fmu_instance = None 

157 

158 def simulate(self, 

159 parameters: Union[dict, List[dict]] = None, 

160 return_option: str = "time_series", 

161 **kwargs): 

162 """ 

163 Perform the single simulation for the given 

164 unzip directory and fmu_instance. 

165 See the docstring of simulate() for information on kwargs. 

166 

167 Additional kwargs: 

168 

169 :keyword str result_file_suffix: 

170 Suffix of the result file. Supported options can be extracted 

171 from the TimeSeriesData.save() function. 

172 Default is 'csv'. 

173 :keyword str parquet_engine: 

174 The engine to use for the data format parquet. 

175 Supported options can be extracted 

176 from the TimeSeriesData.save() function. 

177 Default is 'pyarrow'. 

178 

179 """ 

180 return super().simulate(parameters=parameters, return_option=return_option, **kwargs) 

181 

182 def _single_simulation(self, kwargs): 

183 """ 

184 Perform the single simulation for the given 

185 unzip directory and fmu_instance. 

186 See the docstring of simulate() for information on kwargs. 

187 

188 The single argument kwarg is to make this 

189 function accessible by multiprocessing pool.map. 

190 """ 

191 # Unpack kwargs: 

192 parameters = kwargs.pop("parameters", None) 

193 return_option = kwargs.pop("return_option", "time_series") 

194 inputs = kwargs.pop("inputs", None) 

195 fail_on_error = kwargs.pop("fail_on_error", True) 

196 result_file_name = kwargs.pop("result_file_name", "resultFile") 

197 result_file_suffix = kwargs.pop("result_file_suffix", "csv") 

198 parquet_engine = kwargs.pop('parquet_engine', 'pyarrow') 

199 savepath = kwargs.pop("savepath", None) 

200 if kwargs: 

201 self.logger.error( 

202 "You passed the following kwargs which " 

203 "are not part of the supported kwargs and " 

204 "have thus no effect: %s.", " ,".join(list(kwargs.keys()))) 

205 

206 if self.use_mp: 

207 if self._fmu_instance is None: 

208 self._setup_single_fmu_instance(use_mp=True) 

209 

210 if inputs is not None: 

211 if not isinstance(inputs, (TimeSeriesData, pd.DataFrame)): 

212 raise TypeError("DataFrame or TimeSeriesData object expected for inputs.") 

213 inputs = inputs.copy() # Create save copy 

214 if isinstance(inputs, TimeSeriesData): 

215 inputs = inputs.to_df(force_single_index=True) 

216 if "time" in inputs.columns: 

217 raise IndexError( 

218 "Given inputs contain a column named 'time'. " 

219 "The index is assumed to contain the time-information." 

220 ) 

221 # Convert df to structured numpy array for fmpy: simulate_fmu 

222 inputs.insert(0, column="time", value=inputs.index) 

223 inputs_tuple = [tuple(columns) for columns in inputs.to_numpy()] 

224 # Try to match the type, default is np.double. 

225 # 'time' is not in inputs and thus handled separately. 

226 dtype = [(inputs.columns[0], np.double)] + \ 

227 [(col, 

228 self._type_map.get(self.inputs[col].type, np.double) 

229 ) for col in inputs.columns[1:]] 

230 inputs = np.array(inputs_tuple, dtype=dtype) 

231 if parameters is None: 

232 parameters = {} 

233 else: 

234 self.check_unsupported_variables(variables=list(parameters.keys()), 

235 type_of_var="parameters") 

236 try: 

237 # reset the FMU instance instead of creating a new one 

238 self._fmu_instance.reset() 

239 # Simulate 

240 res = fmpy.simulate_fmu( 

241 filename=self._unzip_dir, 

242 start_time=self.sim_setup.start_time, 

243 stop_time=self.sim_setup.stop_time, 

244 solver=self.sim_setup.solver, 

245 step_size=self.sim_setup.fixedstepsize, 

246 relative_tolerance=None, 

247 output_interval=self.sim_setup.output_interval, 

248 record_events=False, # Used for an equidistant output 

249 start_values=parameters, 

250 apply_default_start_values=False, # As we pass start_values already 

251 input=inputs, 

252 output=self.result_names, 

253 timeout=self.sim_setup.timeout, 

254 step_finished=None, 

255 model_description=self._model_description, 

256 fmu_instance=self._fmu_instance, 

257 fmi_type=self._fmi_type, 

258 ) 

259 

260 except Exception as error: 

261 self.logger.error(f"[SIMULATION ERROR] Error occurred while running FMU: \n {error}") 

262 if fail_on_error: 

263 raise error 

264 return None 

265 

266 # Reshape result: 

267 df = pd.DataFrame(res).set_index("time") 

268 df.index = np.round(df.index.astype("float64"), 

269 str(self.sim_setup.output_interval)[::-1].find('.')) 

270 

271 if return_option == "savepath": 

272 if savepath is None: 

273 savepath = self.working_directory 

274 

275 os.makedirs(savepath, exist_ok=True) 

276 filepath = os.path.join(savepath, f"{result_file_name}.{result_file_suffix}") 

277 TimeSeriesData(df).droplevel(1, axis=1).save( 

278 filepath=filepath, 

279 key="simulation", 

280 engine=parquet_engine 

281 ) 

282 

283 return filepath 

284 if return_option == "last_point": 

285 return df.iloc[-1].to_dict() 

286 # Else return time series data 

287 tsd = TimeSeriesData(df, default_tag="sim") 

288 return tsd 

289 

290 def setup_fmu_instance(self): 

291 """ 

292 Manually set up and extract the data to 

293 avoid this step in the simulate function. 

294 """ 

295 self.logger.info("Extracting fmu and reading fmu model description") 

296 # First load model description and extract variables 

297 self._single_unzip_dir = self.working_directory.joinpath(os.path.basename(self.model_name)[:-4] + "_extracted") 

298 self._single_unzip_dir.mkdir(exist_ok=True) 

299 self._single_unzip_dir = fmpy.extract(self.model_name, 

300 unzipdir=self._single_unzip_dir) 

301 self._model_description = read_model_description(self._single_unzip_dir, 

302 validate=True) 

303 

304 if self._model_description.coSimulation is None: 

305 self._fmi_type = 'ModelExchange' 

306 else: 

307 self._fmi_type = 'CoSimulation' 

308 

309 self.logger.info("Reading model variables") 

310 

311 _types = { 

312 "Enumeration": int, 

313 "Integer": int, 

314 "Real": float, 

315 "Boolean": bool, 

316 "String": str 

317 } 

318 # Extract inputs, outputs & tuner (lists from parent classes will be appended) 

319 for var in self._model_description.modelVariables: 

320 if var.start is not None: 

321 var.start = _types[var.type](var.start) 

322 

323 _var_ebcpy = Variable( 

324 min=var.min, 

325 max=var.max, 

326 value=var.start, 

327 type=_types[var.type] 

328 ) 

329 if var.causality == 'input': 

330 self.inputs[var.name] = _var_ebcpy 

331 elif var.causality == 'output': 

332 self.outputs[var.name] = _var_ebcpy 

333 elif var.causality == 'parameter' or var.causality == 'calculatedParameter': 

334 self.parameters[var.name] = _var_ebcpy 

335 elif var.causality == 'local': 

336 self.states[var.name] = _var_ebcpy 

337 else: 

338 self.logger.error(f"Could not map causality {var.causality}" 

339 f" to any variable type.") 

340 

341 if self.use_mp: 

342 self.logger.info("Extracting fmu %s times for " 

343 "multiprocessing on %s processes", 

344 self.n_cpu, self.n_cpu) 

345 self.pool.map( 

346 self._setup_single_fmu_instance, 

347 [True for _ in range(self.n_cpu)] 

348 ) 

349 self.logger.info("Instantiated fmu's on all processes.") 

350 else: 

351 self._setup_single_fmu_instance(use_mp=False) 

352 

353 def _setup_single_fmu_instance(self, use_mp): 

354 if use_mp: 

355 wrk_idx = self.worker_idx 

356 if self._fmu_instance is not None: 

357 return True 

358 unzip_dir = self._single_unzip_dir.with_stem(self._single_unzip_dir + f"_worker_{wrk_idx}") 

359 fmpy.extract(self.model_name, 

360 unzipdir=unzip_dir) 

361 else: 

362 wrk_idx = 0 

363 unzip_dir = self._single_unzip_dir 

364 

365 self.logger.info("Instantiating fmu for worker %s", wrk_idx) 

366 fmu_instance = fmpy.instantiate_fmu( 

367 unzipdir=unzip_dir, 

368 model_description=self._model_description, 

369 fmi_type=self._fmi_type, 

370 visible=False, 

371 debug_logging=False, 

372 logger=self._custom_logger, 

373 fmi_call_logger=None) 

374 if use_mp: 

375 FMU_API._fmu_instance = fmu_instance 

376 FMU_API._unzip_dir = unzip_dir 

377 else: 

378 self._fmu_instance = fmu_instance 

379 self._unzip_dir = unzip_dir 

380 return True 

381 

382 def _custom_logger(self, component, instanceName, status, category, message): 

383 """ Print the FMU's log messages to the command line (works for both FMI 1.0 and 2.0) """ 

384 # pylint: disable=unused-argument, invalid-name 

385 label = ['OK', 'WARNING', 'DISCARD', 'ERROR', 'FATAL', 'PENDING'][status] 

386 _level_map = {'OK': logging.INFO, 

387 'WARNING': logging.WARNING, 

388 'DISCARD': logging.WARNING, 

389 'ERROR': logging.ERROR, 

390 'FATAL': logging.FATAL, 

391 'PENDING': logging.FATAL} 

392 if self.log_fmu: 

393 self.logger.log(level=_level_map[label], msg=message.decode("utf-8")) 

394 

395 def save_for_reproduction(self, 

396 title: str, 

397 path: Path = None, 

398 files: list = None, 

399 **kwargs): 

400 """ 

401 Additionally to the basic reproduction, add info 

402 for FMU files. 

403 """ 

404 if files is None: 

405 files = [] 

406 files.append(CopyFile( 

407 filename="FMU/" + Path(self.model_name).name, 

408 sourcepath=Path(self.model_name), 

409 remove=False 

410 )) 

411 return super().save_for_reproduction( 

412 title=title, 

413 path=path, 

414 files=files, 

415 **kwargs 

416 )