Coverage for ebcpy/simulationapi/fmu.py: 72%

178 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-09-19 12:21 +0000

1"""Module for classes using a fmu to 

2simulate models.""" 

3 

4import os 

5import logging 

6import pathlib 

7import atexit 

8import shutil 

9from typing import List, Union 

10 

11import fmpy 

12from fmpy.model_description import read_model_description 

13from pydantic import Field 

14import pandas as pd 

15import numpy as np 

16 

17from ebcpy import simulationapi, TimeSeriesData 

18from ebcpy.simulationapi import SimulationSetup, SimulationSetupClass, Variable 

19from ebcpy.utils.reproduction import CopyFile 

20 

21# pylint: disable=broad-except 

22 

23 

24class FMU_Setup(SimulationSetup): 

25 """ 

26 Add's custom setup parameters for simulating FMU's 

27 to the basic `SimulationSetup` 

28 """ 

29 

30 timeout: float = Field( 

31 title="timeout", 

32 default=np.inf, 

33 description="Timeout after which the simulation stops." 

34 ) 

35 

36 _default_solver = "CVode" 

37 _allowed_solvers = ["CVode", "Euler"] 

38 

39 

40class FMU_API(simulationapi.SimulationAPI): 

41 """ 

42 Class for simulation using the fmpy library and 

43 a functional mockup interface as a model input. 

44 

45 :keyword bool log_fmu: 

46 Whether to print fmu messages or not. 

47 

48 Example: 

49 

50 >>> import matplotlib.pyplot as plt 

51 >>> from ebcpy import FMU_API 

52 >>> # Select any valid fmu. Replace the line below if 

53 >>> # you don't have this file on your device. 

54 >>> model_name = "Path to your fmu" 

55 >>> fmu_api = FMU_API(model_name) 

56 >>> fmu_api.sim_setup = {"stop_time": 3600} 

57 >>> result_df = fmu_api.simulate() 

58 >>> fmu_api.close() 

59 >>> # Select an exemplary column 

60 >>> col = result_df.columns[0] 

61 >>> plt.plot(result_df[col], label=col) 

62 >>> _ = plt.legend() 

63 >>> _ = plt.show() 

64 

65 .. versionadded:: 0.1.7 

66 """ 

67 _items_to_drop = ["pool", "_fmu_instance", "_unzip_dir"] 

68 _fmu_instance = None 

69 _unzip_dir: str = None 

70 _sim_setup_class: SimulationSetupClass = FMU_Setup 

71 _type_map = { 

72 float: np.double, 

73 bool: np.bool_, 

74 int: np.int_ 

75 } 

76 

77 def __init__(self, working_directory, model_name, **kwargs): 

78 """Instantiate class parameters""" 

79 # Init instance attributes 

80 self._model_description = None 

81 self._fmi_type = None 

82 self._unzip_dir = None 

83 self._fmu_instance = None 

84 self.log_fmu = kwargs.get("log_fmu", True) 

85 self._single_unzip_dir: str = None 

86 

87 if isinstance(model_name, pathlib.Path): 

88 model_name = str(model_name) 

89 if not model_name.lower().endswith(".fmu"): 

90 raise ValueError(f"{model_name} is not a valid fmu file!") 

91 if working_directory is None: 

92 working_directory = os.path.dirname(model_name) 

93 super().__init__(working_directory, model_name, **kwargs) 

94 # Register exit option 

95 atexit.register(self.close) 

96 

97 def _update_model(self): 

98 # Setup the fmu instance 

99 self.setup_fmu_instance() 

100 

101 def close(self): 

102 """ 

103 Closes the fmu. 

104 

105 :return: bool 

106 True on success 

107 """ 

108 # Close MP of super class 

109 super().close() 

110 # Close if single process 

111 if not self.use_mp: 

112 if not self._fmu_instance: 

113 return # Already closed 

114 self._single_close(fmu_instance=self._fmu_instance, 

115 unzip_dir=self._unzip_dir) 

116 self._unzip_dir = None 

117 self._fmu_instance = None 

118 

119 def _single_close(self, **kwargs): 

120 fmu_instance = kwargs["fmu_instance"] 

121 unzip_dir = kwargs["unzip_dir"] 

122 try: 

123 fmu_instance.terminate() 

124 except Exception as error: # This is due to fmpy which does not yield a narrow error 

125 self.logger.error(f"Could not terminate fmu instance: {error}") 

126 try: 

127 fmu_instance.freeInstance() 

128 except OSError as error: 

129 self.logger.error(f"Could not free fmu instance: {error}") 

130 # Remove the extracted files 

131 if unzip_dir is not None: 

132 try: 

133 shutil.rmtree(unzip_dir) 

134 except FileNotFoundError: 

135 pass # Nothing to delete 

136 except PermissionError: 

137 self.logger.error("Could not delete unzipped fmu " 

138 "in location %s. Delete it yourself.", unzip_dir) 

139 

140 def _close_multiprocessing(self, _): 

141 """Small helper function""" 

142 idx_worker = self.worker_idx 

143 if self._fmu_instance is None: 

144 return # Already closed 

145 self.logger.error(f"Closing fmu for worker {idx_worker}") 

146 self._single_close(fmu_instance=self._fmu_instance, 

147 unzip_dir=self._unzip_dir) 

148 self._unzip_dir = None 

149 self._fmu_instance = None 

150 FMU_API._unzip_dir = None 

151 FMU_API._fmu_instance = None 

152 

153 def simulate(self, 

154 parameters: Union[dict, List[dict]] = None, 

155 return_option: str = "time_series", 

156 **kwargs): 

157 """ 

158 Perform the single simulation for the given 

159 unzip directory and fmu_instance. 

160 See the docstring of simulate() for information on kwargs. 

161 

162 Additional kwargs: 

163 

164 :keyword str result_file_suffix: 

165 Suffix of the result file. Supported options can be extracted 

166 from the TimeSeriesData.save() function. 

167 Default is 'csv'. 

168 :keyword str parquet_engine: 

169 The engine to use for the data format parquet. 

170 Supported options can be extracted 

171 from the TimeSeriesData.save() function. 

172 Default is 'pyarrow'. 

173 

174 """ 

175 return super().simulate(parameters=parameters, return_option=return_option, **kwargs) 

176 

177 def _single_simulation(self, kwargs): 

178 """ 

179 Perform the single simulation for the given 

180 unzip directory and fmu_instance. 

181 See the docstring of simulate() for information on kwargs. 

182 

183 The single argument kwarg is to make this 

184 function accessible by multiprocessing pool.map. 

185 """ 

186 # Unpack kwargs: 

187 parameters = kwargs.pop("parameters", None) 

188 return_option = kwargs.pop("return_option", "time_series") 

189 inputs = kwargs.pop("inputs", None) 

190 fail_on_error = kwargs.pop("fail_on_error", True) 

191 result_file_name = kwargs.pop("result_file_name", "resultFile") 

192 result_file_suffix = kwargs.pop("result_file_suffix", "csv") 

193 parquet_engine = kwargs.pop('parquet_engine', 'pyarrow') 

194 savepath = kwargs.pop("savepath", None) 

195 if kwargs: 

196 self.logger.error( 

197 "You passed the following kwargs which " 

198 "are not part of the supported kwargs and " 

199 "have thus no effect: %s.", " ,".join(list(kwargs.keys()))) 

200 

201 if self.use_mp: 

202 if self._fmu_instance is None: 

203 self._setup_single_fmu_instance(use_mp=True) 

204 

205 if inputs is not None: 

206 if not isinstance(inputs, (TimeSeriesData, pd.DataFrame)): 

207 raise TypeError("DataFrame or TimeSeriesData object expected for inputs.") 

208 inputs = inputs.copy() # Create save copy 

209 if isinstance(inputs, TimeSeriesData): 

210 inputs = inputs.to_df(force_single_index=True) 

211 if "time" in inputs.columns: 

212 raise IndexError( 

213 "Given inputs contain a column named 'time'. " 

214 "The index is assumed to contain the time-information." 

215 ) 

216 # Convert df to structured numpy array for fmpy: simulate_fmu 

217 inputs.insert(0, column="time", value=inputs.index) 

218 inputs_tuple = [tuple(columns) for columns in inputs.to_numpy()] 

219 # Try to match the type, default is np.double. 

220 # 'time' is not in inputs and thus handled separately. 

221 dtype = [(inputs.columns[0], np.double)] + \ 

222 [(col, 

223 self._type_map.get(self.inputs[col].type, np.double) 

224 ) for col in inputs.columns[1:]] 

225 inputs = np.array(inputs_tuple, dtype=dtype) 

226 if parameters is None: 

227 parameters = {} 

228 else: 

229 self.check_unsupported_variables(variables=list(parameters.keys()), 

230 type_of_var="parameters") 

231 try: 

232 # reset the FMU instance instead of creating a new one 

233 self._fmu_instance.reset() 

234 # Simulate 

235 res = fmpy.simulate_fmu( 

236 filename=self._unzip_dir, 

237 start_time=self.sim_setup.start_time, 

238 stop_time=self.sim_setup.stop_time, 

239 solver=self.sim_setup.solver, 

240 step_size=self.sim_setup.fixedstepsize, 

241 relative_tolerance=None, 

242 output_interval=self.sim_setup.output_interval, 

243 record_events=False, # Used for an equidistant output 

244 start_values=parameters, 

245 apply_default_start_values=False, # As we pass start_values already 

246 input=inputs, 

247 output=self.result_names, 

248 timeout=self.sim_setup.timeout, 

249 step_finished=None, 

250 model_description=self._model_description, 

251 fmu_instance=self._fmu_instance, 

252 fmi_type=self._fmi_type, 

253 ) 

254 

255 except Exception as error: 

256 self.logger.error(f"[SIMULATION ERROR] Error occurred while running FMU: \n {error}") 

257 if fail_on_error: 

258 raise error 

259 return None 

260 

261 # Reshape result: 

262 df = pd.DataFrame(res).set_index("time") 

263 df.index = np.round(df.index.astype("float64"), 

264 str(self.sim_setup.output_interval)[::-1].find('.')) 

265 

266 if return_option == "savepath": 

267 if savepath is None: 

268 savepath = self.working_directory 

269 

270 os.makedirs(savepath, exist_ok=True) 

271 filepath = os.path.join(savepath, f"{result_file_name}.{result_file_suffix}") 

272 TimeSeriesData(df).droplevel(1, axis=1).save( 

273 filepath=filepath, 

274 key="simulation", 

275 engine=parquet_engine 

276 ) 

277 

278 return filepath 

279 if return_option == "last_point": 

280 return df.iloc[-1].to_dict() 

281 # Else return time series data 

282 tsd = TimeSeriesData(df, default_tag="sim") 

283 return tsd 

284 

285 def setup_fmu_instance(self): 

286 """ 

287 Manually set up and extract the data to 

288 avoid this step in the simulate function. 

289 """ 

290 self.logger.info("Extracting fmu and reading fmu model description") 

291 # First load model description and extract variables 

292 self._single_unzip_dir = os.path.join(self.working_directory, 

293 os.path.basename(self.model_name)[:-4] + "_extracted") 

294 os.makedirs(self._single_unzip_dir, exist_ok=True) 

295 self._single_unzip_dir = fmpy.extract(self.model_name, 

296 unzipdir=self._single_unzip_dir) 

297 self._model_description = read_model_description(self._single_unzip_dir, 

298 validate=True) 

299 

300 if self._model_description.coSimulation is None: 

301 self._fmi_type = 'ModelExchange' 

302 else: 

303 self._fmi_type = 'CoSimulation' 

304 

305 self.logger.info("Reading model variables") 

306 

307 _types = { 

308 "Enumeration": int, 

309 "Integer": int, 

310 "Real": float, 

311 "Boolean": bool, 

312 "String": str 

313 } 

314 # Extract inputs, outputs & tuner (lists from parent classes will be appended) 

315 for var in self._model_description.modelVariables: 

316 if var.start is not None: 

317 var.start = _types[var.type](var.start) 

318 

319 _var_ebcpy = Variable( 

320 min=var.min, 

321 max=var.max, 

322 value=var.start, 

323 type=_types[var.type] 

324 ) 

325 if var.causality == 'input': 

326 self.inputs[var.name] = _var_ebcpy 

327 elif var.causality == 'output': 

328 self.outputs[var.name] = _var_ebcpy 

329 elif var.causality == 'parameter' or var.causality == 'calculatedParameter': 

330 self.parameters[var.name] = _var_ebcpy 

331 elif var.causality == 'local': 

332 self.states[var.name] = _var_ebcpy 

333 else: 

334 self.logger.error(f"Could not map causality {var.causality}" 

335 f" to any variable type.") 

336 

337 if self.use_mp: 

338 self.logger.info("Extracting fmu %s times for " 

339 "multiprocessing on %s processes", 

340 self.n_cpu, self.n_cpu) 

341 self.pool.map( 

342 self._setup_single_fmu_instance, 

343 [True for _ in range(self.n_cpu)] 

344 ) 

345 self.logger.info("Instantiated fmu's on all processes.") 

346 else: 

347 self._setup_single_fmu_instance(use_mp=False) 

348 

349 def _setup_single_fmu_instance(self, use_mp): 

350 if use_mp: 

351 wrk_idx = self.worker_idx 

352 if self._fmu_instance is not None: 

353 return True 

354 unzip_dir = self._single_unzip_dir + f"_worker_{wrk_idx}" 

355 fmpy.extract(self.model_name, 

356 unzipdir=unzip_dir) 

357 else: 

358 wrk_idx = 0 

359 unzip_dir = self._single_unzip_dir 

360 

361 self.logger.info("Instantiating fmu for worker %s", wrk_idx) 

362 fmu_instance = fmpy.instantiate_fmu( 

363 unzipdir=unzip_dir, 

364 model_description=self._model_description, 

365 fmi_type=self._fmi_type, 

366 visible=False, 

367 debug_logging=False, 

368 logger=self._custom_logger, 

369 fmi_call_logger=None) 

370 if use_mp: 

371 FMU_API._fmu_instance = fmu_instance 

372 FMU_API._unzip_dir = unzip_dir 

373 else: 

374 self._fmu_instance = fmu_instance 

375 self._unzip_dir = unzip_dir 

376 return True 

377 

378 def _custom_logger(self, component, instanceName, status, category, message): 

379 """ Print the FMU's log messages to the command line (works for both FMI 1.0 and 2.0) """ 

380 # pylint: disable=unused-argument, invalid-name 

381 label = ['OK', 'WARNING', 'DISCARD', 'ERROR', 'FATAL', 'PENDING'][status] 

382 _level_map = {'OK': logging.INFO, 

383 'WARNING': logging.WARNING, 

384 'DISCARD': logging.WARNING, 

385 'ERROR': logging.ERROR, 

386 'FATAL': logging.FATAL, 

387 'PENDING': logging.FATAL} 

388 if self.log_fmu: 

389 self.logger.log(level=_level_map[label], msg=message.decode("utf-8")) 

390 

391 def save_for_reproduction(self, 

392 title: str, 

393 path: pathlib.Path = None, 

394 files: list = None, 

395 **kwargs): 

396 """ 

397 Additionally to the basic reproduction, add info 

398 for FMU files. 

399 """ 

400 if files is None: 

401 files = [] 

402 files.append(CopyFile( 

403 filename="FMU/" + pathlib.Path(self.model_name).name, 

404 sourcepath=pathlib.Path(self.model_name), 

405 remove=False 

406 )) 

407 return super().save_for_reproduction( 

408 title=title, 

409 path=path, 

410 files=files, 

411 **kwargs 

412 )