Coverage for ebcpy/simulationapi/fmu.py: 73%

177 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2025-08-26 09:12 +0000

1"""Module for classes using a fmu to 

2simulate models.""" 

3 

4import os 

5import logging 

6import atexit 

7import shutil 

8from pathlib import Path 

9from typing import List, Union 

10 

11import fmpy 

12from fmpy.model_description import read_model_description 

13from pydantic import Field 

14import pandas as pd 

15import numpy as np 

16 

17from ebcpy import simulationapi, TimeSeriesData 

18from ebcpy.simulationapi import SimulationSetup, SimulationSetupClass, Variable 

19from ebcpy.utils.reproduction import CopyFile 

20 

21# pylint: disable=broad-except 

22 

23 

24class FMU_Setup(SimulationSetup): 

25 """ 

26 Add's custom setup parameters for simulating FMU's 

27 to the basic `SimulationSetup` 

28 """ 

29 

30 timeout: float = Field( 

31 title="timeout", 

32 default=np.inf, 

33 description="Timeout after which the simulation stops." 

34 ) 

35 

36 _default_solver = "CVode" 

37 _allowed_solvers = ["CVode", "Euler"] 

38 

39 

40class FMU_API(simulationapi.SimulationAPI): 

41 """ 

42 Class for simulation using the fmpy library and 

43 a functional mockup interface as a model input. 

44 

45 :param str,Path model_name: 

46 Path to the .fmu model to be simulated. 

47 :param str,Path working_directory: 

48 Dirpath for the current working directory of simulation 

49 results. If None (default), the path of the fmu is used. 

50 :keyword bool log_fmu: 

51 Whether to print fmu messages or not. 

52 

53 Example: 

54 

55 >>> import matplotlib.pyplot as plt 

56 >>> from ebcpy import FMU_API 

57 >>> # Select any valid fmu. Replace the line below if 

58 >>> # you don't have this file on your device. 

59 >>> model_name = "Path to your fmu" 

60 >>> fmu_api = FMU_API(model_name) 

61 >>> fmu_api.sim_setup = {"stop_time": 3600} 

62 >>> result_df = fmu_api.simulate() 

63 >>> fmu_api.close() 

64 >>> # Select an exemplary column 

65 >>> col = result_df.columns[0] 

66 >>> plt.plot(result_df[col], label=col) 

67 >>> _ = plt.legend() 

68 >>> _ = plt.show() 

69 

70 .. versionadded:: 0.1.7 

71 """ 

72 _items_to_drop = ["pool", "_fmu_instance", "_unzip_dir"] 

73 _fmu_instance = None 

74 _unzip_dir: str = None 

75 _sim_setup_class: SimulationSetupClass = FMU_Setup 

76 _type_map = { 

77 float: np.double, 

78 bool: np.bool_, 

79 int: np.int_ 

80 } 

81 

82 def __init__(self, model_name: Union[str, Path], working_directory: Union[str, Path] = None, **kwargs): 

83 """Instantiate class parameters""" 

84 # Init instance attributes 

85 self._model_description = None 

86 self._fmi_type = None 

87 self._unzip_dir = None 

88 self._fmu_instance = None 

89 self.log_fmu = kwargs.get("log_fmu", True) 

90 self._single_unzip_dir: str = None 

91 

92 if isinstance(model_name, Path): 

93 model_name = str(model_name) 

94 if not model_name.lower().endswith(".fmu"): 

95 raise ValueError(f"{model_name} is not a valid fmu file!") 

96 if working_directory is None: 

97 working_directory = os.path.dirname(model_name) 

98 super().__init__(working_directory, model_name, **kwargs) 

99 # Register exit option 

100 atexit.register(self.close) 

101 

102 def _update_model(self): 

103 # Setup the fmu instance 

104 self.setup_fmu_instance() 

105 

106 def close(self): 

107 """ 

108 Closes the fmu. 

109 

110 :return: bool 

111 True on success 

112 """ 

113 # Close MP of super class 

114 super().close() 

115 # Close if single process 

116 if not self.use_mp: 

117 if not self._fmu_instance: 

118 return # Already closed 

119 self._single_close(fmu_instance=self._fmu_instance, 

120 unzip_dir=self._unzip_dir) 

121 self._unzip_dir = None 

122 self._fmu_instance = None 

123 

124 def _single_close(self, **kwargs): 

125 fmu_instance = kwargs["fmu_instance"] 

126 unzip_dir = kwargs["unzip_dir"] 

127 try: 

128 fmu_instance.terminate() 

129 except Exception as error: # This is due to fmpy which does not yield a narrow error 

130 self.logger.error(f"Could not terminate fmu instance: {error}") 

131 try: 

132 fmu_instance.freeInstance() 

133 except OSError as error: 

134 self.logger.error(f"Could not free fmu instance: {error}") 

135 # Remove the extracted files 

136 if unzip_dir is not None: 

137 try: 

138 shutil.rmtree(unzip_dir) 

139 except FileNotFoundError: 

140 pass # Nothing to delete 

141 except PermissionError: 

142 self.logger.error("Could not delete unzipped fmu " 

143 "in location %s. Delete it yourself.", unzip_dir) 

144 

145 def _close_multiprocessing(self, _): 

146 """Small helper function""" 

147 idx_worker = self.worker_idx 

148 if self._fmu_instance is None: 

149 return # Already closed 

150 self.logger.error(f"Closing fmu for worker {idx_worker}") 

151 self._single_close(fmu_instance=self._fmu_instance, 

152 unzip_dir=self._unzip_dir) 

153 self._unzip_dir = None 

154 self._fmu_instance = None 

155 FMU_API._unzip_dir = None 

156 FMU_API._fmu_instance = None 

157 

158 def simulate(self, 

159 parameters: Union[dict, List[dict]] = None, 

160 return_option: str = "time_series", 

161 **kwargs): 

162 """ 

163 Perform the single simulation for the given 

164 unzip directory and fmu_instance. 

165 See the docstring of simulate() for information on kwargs. 

166 

167 Additional kwargs: 

168 

169 :keyword str result_file_suffix: 

170 Suffix of the result file. Supported options can be extracted 

171 from the save() accessor function. 

172 Default is 'csv'. 

173 :keyword str parquet_engine: 

174 The engine to use for the data format parquet. 

175 Supported options can be extracted 

176 from the save() accessor function. 

177 Default is 'pyarrow'. 

178 

179 """ 

180 return super().simulate(parameters=parameters, return_option=return_option, **kwargs) 

181 

182 def _single_simulation(self, kwargs): 

183 """ 

184 Perform the single simulation for the given 

185 unzip directory and fmu_instance. 

186 See the docstring of simulate() for information on kwargs. 

187 

188 The single argument kwarg is to make this 

189 function accessible by multiprocessing pool.map. 

190 """ 

191 # Unpack kwargs: 

192 parameters = kwargs.pop("parameters", None) 

193 return_option = kwargs.pop("return_option", "time_series") 

194 inputs = kwargs.pop("inputs", None) 

195 fail_on_error = kwargs.pop("fail_on_error", True) 

196 result_file_name = kwargs.pop("result_file_name", "resultFile") 

197 result_file_suffix = kwargs.pop("result_file_suffix", "csv") 

198 parquet_engine = kwargs.pop('parquet_engine', 'pyarrow') 

199 savepath = kwargs.pop("savepath", None) 

200 if kwargs: 

201 self.logger.error( 

202 "You passed the following kwargs which " 

203 "are not part of the supported kwargs and " 

204 "have thus no effect: %s.", " ,".join(list(kwargs.keys()))) 

205 

206 if self.use_mp: 

207 if self._fmu_instance is None: 

208 self._setup_single_fmu_instance(use_mp=True) 

209 

210 if inputs is not None: 

211 if not isinstance(inputs, (TimeSeriesData, pd.DataFrame)): 

212 raise TypeError("DataFrame or TimeSeriesData object expected for inputs.") 

213 inputs = inputs.copy() # Create save copy 

214 if isinstance(inputs, TimeSeriesData): 

215 inputs = inputs.to_df(force_single_index=True) 

216 if "time" in inputs.columns: 

217 raise IndexError( 

218 "Given inputs contain a column named 'time'. " 

219 "The index is assumed to contain the time-information." 

220 ) 

221 # Convert df to structured numpy array for fmpy: simulate_fmu 

222 inputs.insert(0, column="time", value=inputs.index) 

223 inputs_tuple = [tuple(columns) for columns in inputs.to_numpy()] 

224 # Try to match the type, default is np.double. 

225 # 'time' is not in inputs and thus handled separately. 

226 dtype = [(inputs.columns[0], np.double)] + \ 

227 [(col, 

228 self._type_map.get(self.inputs[col].type, np.double) 

229 ) for col in inputs.columns[1:]] 

230 inputs = np.array(inputs_tuple, dtype=dtype) 

231 if parameters is None: 

232 parameters = {} 

233 else: 

234 self.check_unsupported_variables(variables=list(parameters.keys()), 

235 type_of_var="parameters") 

236 try: 

237 # reset the FMU instance instead of creating a new one 

238 self._fmu_instance.reset() 

239 # Simulate 

240 res = fmpy.simulate_fmu( 

241 filename=self._unzip_dir, 

242 start_time=self.sim_setup.start_time, 

243 stop_time=self.sim_setup.stop_time, 

244 solver=self.sim_setup.solver, 

245 step_size=self.sim_setup.fixedstepsize, 

246 relative_tolerance=None, 

247 output_interval=self.sim_setup.output_interval, 

248 record_events=False, # Used for an equidistant output 

249 start_values=parameters, 

250 apply_default_start_values=False, # As we pass start_values already 

251 input=inputs, 

252 output=self.result_names, 

253 timeout=self.sim_setup.timeout, 

254 step_finished=None, 

255 model_description=self._model_description, 

256 fmu_instance=self._fmu_instance, 

257 fmi_type=self._fmi_type, 

258 ) 

259 

260 except Exception as error: 

261 self.logger.error(f"[SIMULATION ERROR] Error occurred while running FMU: \n {error}") 

262 if fail_on_error: 

263 raise error 

264 return None 

265 

266 # Reshape result: 

267 df = pd.DataFrame(res).set_index("time") 

268 df.index = np.round(df.index.astype("float64"), 

269 str(self.sim_setup.output_interval)[::-1].find('.')) 

270 

271 if return_option == "savepath": 

272 if savepath is None: 

273 savepath = self.working_directory 

274 

275 os.makedirs(savepath, exist_ok=True) 

276 filepath = os.path.join(savepath, f"{result_file_name}.{result_file_suffix}") 

277 df.tsd.save( 

278 filepath=filepath, 

279 key="simulation", 

280 engine=parquet_engine 

281 ) 

282 

283 return filepath 

284 if return_option == "last_point": 

285 return df.iloc[-1].to_dict() 

286 # Else return time series data 

287 return df 

288 

289 def setup_fmu_instance(self): 

290 """ 

291 Manually set up and extract the data to 

292 avoid this step in the simulate function. 

293 """ 

294 self.logger.info("Extracting fmu and reading fmu model description") 

295 # First load model description and extract variables 

296 self._single_unzip_dir = self.working_directory.joinpath(os.path.basename(self.model_name)[:-4] + "_extracted") 

297 self._single_unzip_dir.mkdir(exist_ok=True) 

298 self._single_unzip_dir = fmpy.extract(self.model_name, 

299 unzipdir=self._single_unzip_dir) 

300 self._model_description = read_model_description(self._single_unzip_dir, 

301 validate=True) 

302 

303 if self._model_description.coSimulation is None: 

304 self._fmi_type = 'ModelExchange' 

305 else: 

306 self._fmi_type = 'CoSimulation' 

307 

308 self.logger.info("Reading model variables") 

309 

310 _types = { 

311 "Enumeration": int, 

312 "Integer": int, 

313 "Real": float, 

314 "Boolean": bool, 

315 "String": str 

316 } 

317 # Extract inputs, outputs & tuner (lists from parent classes will be appended) 

318 for var in self._model_description.modelVariables: 

319 if var.start is not None: 

320 var.start = _types[var.type](var.start) 

321 

322 _var_ebcpy = Variable( 

323 min=var.min, 

324 max=var.max, 

325 value=var.start, 

326 type=_types[var.type] 

327 ) 

328 if var.causality == 'input': 

329 self.inputs[var.name] = _var_ebcpy 

330 elif var.causality == 'output': 

331 self.outputs[var.name] = _var_ebcpy 

332 elif var.causality == 'parameter' or var.causality == 'calculatedParameter': 

333 self.parameters[var.name] = _var_ebcpy 

334 elif var.causality == 'local': 

335 self.states[var.name] = _var_ebcpy 

336 else: 

337 self.logger.error(f"Could not map causality {var.causality}" 

338 f" to any variable type.") 

339 

340 if self.use_mp: 

341 self.logger.info("Extracting fmu %s times for " 

342 "multiprocessing on %s processes", 

343 self.n_cpu, self.n_cpu) 

344 self.pool.map( 

345 self._setup_single_fmu_instance, 

346 [True for _ in range(self.n_cpu)] 

347 ) 

348 self.logger.info("Instantiated fmu's on all processes.") 

349 else: 

350 self._setup_single_fmu_instance(use_mp=False) 

351 

352 def _setup_single_fmu_instance(self, use_mp): 

353 if use_mp: 

354 wrk_idx = self.worker_idx 

355 if self._fmu_instance is not None: 

356 return True 

357 unzip_dir = self._single_unzip_dir.with_stem(self._single_unzip_dir.stem + f"_worker_{wrk_idx}") 

358 fmpy.extract(self.model_name, 

359 unzipdir=unzip_dir) 

360 else: 

361 wrk_idx = 0 

362 unzip_dir = self._single_unzip_dir 

363 

364 self.logger.info("Instantiating fmu for worker %s", wrk_idx) 

365 fmu_instance = fmpy.instantiate_fmu( 

366 unzipdir=unzip_dir, 

367 model_description=self._model_description, 

368 fmi_type=self._fmi_type, 

369 visible=False, 

370 debug_logging=False, 

371 logger=self._custom_logger, 

372 fmi_call_logger=None) 

373 if use_mp: 

374 FMU_API._fmu_instance = fmu_instance 

375 FMU_API._unzip_dir = unzip_dir 

376 else: 

377 self._fmu_instance = fmu_instance 

378 self._unzip_dir = unzip_dir 

379 return True 

380 

381 def _custom_logger(self, component, instanceName, status, category, message): 

382 """ Print the FMU's log messages to the command line (works for both FMI 1.0 and 2.0) """ 

383 # pylint: disable=unused-argument, invalid-name 

384 label = ['OK', 'WARNING', 'DISCARD', 'ERROR', 'FATAL', 'PENDING'][status] 

385 _level_map = {'OK': logging.INFO, 

386 'WARNING': logging.WARNING, 

387 'DISCARD': logging.WARNING, 

388 'ERROR': logging.ERROR, 

389 'FATAL': logging.FATAL, 

390 'PENDING': logging.FATAL} 

391 if self.log_fmu: 

392 self.logger.log(level=_level_map[label], msg=message.decode("utf-8")) 

393 

394 def save_for_reproduction(self, 

395 title: str, 

396 path: Path = None, 

397 files: list = None, 

398 **kwargs): 

399 """ 

400 Additionally to the basic reproduction, add info 

401 for FMU files. 

402 """ 

403 if files is None: 

404 files = [] 

405 files.append(CopyFile( 

406 filename="FMU/" + Path(self.model_name).name, 

407 sourcepath=Path(self.model_name), 

408 remove=False 

409 )) 

410 return super().save_for_reproduction( 

411 title=title, 

412 path=path, 

413 files=files, 

414 **kwargs 

415 )