Coverage for agentlib/models/fmu_model.py: 32%

164 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2025-04-07 16:27 +0000

1"""This module contains the FMUModel class.""" 

2 

3import queue 

4import shutil 

5import os 

6import logging 

7import pathlib 

8import uuid 

9from itertools import chain 

10from typing import Union, List 

11 

12import attrs 

13import pydantic 

14 

15from pydantic import field_validator, FilePath 

16from agentlib.core import Model, ModelConfig 

17from agentlib.core.errors import OptionalDependencyError 

18from agentlib.core.datamodels import ModelVariable, Causality 

19 

20try: 

21 import fmpy.fmi2 

22 import fmpy 

23 from fmpy.fmi1 import FMICallException 

24except ImportError as err: 

25 raise OptionalDependencyError( 

26 dependency_name="fmu", dependency_install="fmpy", used_object="FMU-model" 

27 ) from err 

28 

29logger = logging.getLogger(__name__) 

30 

31 

32class FmuModelConfig(ModelConfig): 

33 """ 

34 The Config of FMUModels overwrite the default 

35 ModelConfig to redefine the system and add relevant 

36 fields like path and tolerance of the simulation. 

37 """ 

38 

39 path: FilePath 

40 tolerance: float = 0.001 

41 extract_fmu: bool = False 

42 log_fmu: bool = True 

43 only_config_variables: bool = pydantic.Field( 

44 default=True, 

45 description="If True, only the variables passed to this model by a simulator " 

46 "will be read and written at each simulation step (specified by " 

47 "dt).", 

48 ) 

49 

50 @field_validator("path") 

51 @classmethod 

52 def check_path(cls, path): 

53 """Check if the path has the correct extension""" 

54 # check file extension 

55 assert path.suffix in [".fmu", ".mo"], "Unknown file-extension" 

56 return path 

57 

58 

59class FmuModel(Model): 

60 """Class to wrap any FMU Model into the Model-Standard 

61 of the agentlib. 

62 """ 

63 

64 config: FmuModelConfig 

65 

66 def __init__(self, **kwargs): 

67 # Private map to link variable names (str) to the fmu value reference (int) 

68 self._variables_vr = {} 

69 self._unzip_dir = None 

70 # Initialize config 

71 super().__init__(**kwargs) 

72 # Actively set the path to trigger the automatic setup (evokes __init_fmu) 

73 self.system = self.__init_fmu() 

74 initial_write_values = self.inputs + self.parameters 

75 self._variables_to_write = queue.Queue() 

76 [self._variables_to_write.put(v) for v in initial_write_values] 

77 

78 @property 

79 def tolerance(self): 

80 """Get the tolerance of FMU simulation""" 

81 return self.config.tolerance 

82 

83 @tolerance.setter 

84 def tolerance(self, tolerance: float): 

85 """Set the tolerance in the config.""" 

86 self.config.tolerance = tolerance 

87 

88 @tolerance.deleter 

89 def tolerance(self): 

90 """Delete the tolerance and restore the default value.""" 

91 self.config.tolerance = self.get_config_type().tolerance 

92 

93 @property 

94 def extract_fmu(self): 

95 """Get whether the fmu shall be extracted to a new 

96 directory or if the temp folder is used.""" 

97 return self.config.extract_fmu 

98 

99 def do_step(self, *, t_start, t_sample=None): 

100 if t_sample is None: 

101 t_sample = self.dt 

102 # Write current values to system 

103 while not self._variables_to_write.empty(): 

104 self.__write_value(self._variables_to_write.get_nowait()) 

105 t_samples = self._create_time_samples(t_sample=t_sample) + t_start 

106 try: 

107 for _idx, _t_sample in enumerate(t_samples[:-1]): 

108 self.system.doStep( 

109 currentCommunicationPoint=_t_sample, 

110 communicationStepSize=t_samples[_idx + 1] - _t_sample, 

111 ) 

112 except FMICallException as e: 

113 # Raise a different error, as simpy does not work well with FMI Errors 

114 raise RuntimeError( 

115 "The fmu had an internal error. Please check the logs to analyze it." 

116 ) from e 

117 # Read current values from system 

118 self.__read_values() 

119 return True 

120 

121 def initialize(self, **kwargs): 

122 """ 

123 Initializes FMU model 

124 

125 Required kwargs: 

126 t_start (float): Start time of simulation 

127 t_stop (float): Stop time of simulation 

128 """ 

129 logger.info("Initializing model...") 

130 # Handle Logging of the FMU itself: 

131 try: 

132 callbacks = fmpy.fmi2.fmi2CallbackFunctions() 

133 callbacks.logger = fmpy.fmi2.fmi2CallbackLoggerTYPE(self._fmu_logger) 

134 callbacks.allocateMemory = fmpy.fmi2.fmi2CallbackAllocateMemoryTYPE( 

135 fmpy.calloc 

136 ) 

137 callbacks.freeMemory = fmpy.fmi2.fmi2CallbackFreeMemoryTYPE(fmpy.free) 

138 from fmpy.logging import addLoggerProxy 

139 from ctypes import byref 

140 

141 addLoggerProxy(byref(callbacks)) 

142 except Exception as err: 

143 logger.error("Could not setup custom logger in FMU model: %s", err) 

144 callbacks = None 

145 

146 self.system.instantiate( 

147 callbacks=callbacks, 

148 loggingOn=False, # Only for debug of fmu itself 

149 visible=False, # Only for debug of fmu itself 

150 ) 

151 self.system.reset() 

152 self.system.setupExperiment( 

153 startTime=kwargs["t_start"], 

154 stopTime=kwargs["t_stop"], 

155 tolerance=self.tolerance, 

156 ) 

157 self.system.enterInitializationMode() 

158 self.system.exitInitializationMode() 

159 logger.info("Model: %s initialized", self.name) 

160 

161 def _fmu_logger(self, component, instanceName, status, category, message): 

162 """Print the FMU's log messages to the command line (works for both FMI 1.0 and 2.0)""" 

163 # pylint: disable=unused-argument, invalid-name 

164 if self.config.log_fmu: 

165 label = ["OK", "WARNING", "DISCARD", "ERROR", "FATAL", "PENDING"][status] 

166 _level_map = { 

167 "OK": logging.INFO, 

168 "WARNING": logging.WARNING, 

169 "DISCARD": logging.WARNING, 

170 "ERROR": logging.ERROR, 

171 "FATAL": logging.FATAL, 

172 "PENDING": logging.FATAL, 

173 } 

174 logger.log(level=_level_map[label], msg=message.decode("utf-8")) 

175 

176 def __init_fmu(self) -> fmpy.fmi2.FMU2Slave: 

177 path = self.config.path 

178 # System setup: 

179 # extract the FMU 

180 if self.extract_fmu: 

181 # Create own unzip directory 

182 _path = pathlib.Path(path) 

183 if not _path.is_absolute(): 

184 _path = pathlib.Path(os.getcwd()).joinpath(_path) 

185 _unzip_dir = _path.parents[0].joinpath( 

186 f'{_path.name.replace(".fmu", "")}' f"_extracted_{uuid.uuid4()}" 

187 ) 

188 _unzip_dir = str(_unzip_dir) 

189 else: 

190 _unzip_dir = None 

191 cur_cwd = os.getcwd() 

192 self._unzip_dir = fmpy.extract(filename=path, unzipdir=_unzip_dir) 

193 os.chdir(cur_cwd) # Reset cwd. fmpy changes it sometimes. 

194 # Read the model description 

195 _model_description = fmpy.read_model_description(self._unzip_dir, validate=True) 

196 _system = fmpy.fmi2.FMU2Slave( 

197 guid=_model_description.guid, 

198 unzipDirectory=self._unzip_dir, 

199 modelIdentifier=_model_description.coSimulation.modelIdentifier, 

200 instanceName=__name__, 

201 fmiCallLogger=None, 

202 ) 

203 self.name = _model_description.modelName 

204 if _model_description.description is not None: 

205 self.description = _model_description.description 

206 

207 # Variable setup: 

208 # Get the inputs, outputs, internals and parameters 

209 self._variables_vr = {} 

210 _vars = { 

211 Causality.input: [], 

212 Causality.parameter: [], 

213 Causality.calculatedParameter: [], 

214 Causality.output: [], 

215 Causality.local: [], 

216 Causality.independent: [], 

217 } 

218 config_vars = set(self.config.get_variable_names()) 

219 for _model_var in _model_description.modelVariables: 

220 # Convert to an agentlib ModelVariable object 

221 if _model_var.type == "String": 

222 logger.warning( 

223 "String variable %s omitted. Not supported in AgentLib.", 

224 _model_var.name, 

225 ) 

226 continue # Don't allow string model variables 

227 

228 # if desired, we skip adding variables to this model instance if they are 

229 # not specified from outside. They will remain within the 

230 # fmpy.fmi2.FMU2Slave instance 

231 if self.config.only_config_variables and _model_var.name not in config_vars: 

232 continue 

233 

234 _vars[_model_var.causality].append( 

235 dict( 

236 name=_model_var.name, 

237 type=_model_var.type, 

238 value=( 

239 self._converter(_model_var.type, _model_var.start) 

240 if ( 

241 _model_var.causality 

242 in [ 

243 Causality.parameter, 

244 Causality.calculatedParameter, 

245 Causality.input, 

246 ] 

247 and _model_var.start is not None 

248 ) 

249 else None 

250 ), 

251 unit=( 

252 _model_var.unit 

253 if _model_var.unit is not None 

254 else attrs.fields(ModelVariable).unit.default 

255 ), 

256 description=( 

257 _model_var.description 

258 if _model_var.description is not None 

259 else attrs.fields(ModelVariable).description.default 

260 ), 

261 causality=_model_var.causality, 

262 variability=_model_var.variability, 

263 ) 

264 ) 

265 self._variables_vr.update({_model_var.name: _model_var.valueReference}) 

266 # This sets the inputs, outputs, internals and parameters and variables 

267 self.config = { 

268 "inputs": _vars[Causality.input], 

269 "outputs": _vars[Causality.output], 

270 "parameters": _vars[Causality.parameter] 

271 + _vars[Causality.calculatedParameter], 

272 "states": _vars[Causality.local] + _vars[Causality.independent], 

273 **self.config.model_dump( 

274 exclude={"inputs", "outputs", "states", "parameters"} 

275 ), 

276 } 

277 return _system 

278 

279 def __write_value(self, var: ModelVariable): 

280 # One can only set inputs and parameters! 

281 

282 if var.value is None: 

283 logger.error( 

284 "Tried setting the value of %s to None. This will not be set in the " 

285 "FMU.", 

286 var.name, 

287 ) 

288 return 

289 

290 _vr = self._variables_vr[var.name] 

291 if var.type == "Real": 

292 self.system.setReal([_vr], [float(var.value)]) 

293 elif var.type in ["Integer", "Enumeration"]: 

294 self.system.setInteger([_vr], [int(var.value)]) 

295 elif var.type == "Boolean": 

296 self.system.setBoolean([_vr], [bool(var.value)]) 

297 else: 

298 logger.error("Variable %s not valid for this model!", var.name) 

299 

300 def set_input_values(self, names: List[str], values: List[Union[float, int, bool]]): 

301 """Sets input values in the model and in the FMU.""" 

302 super().set_input_values(names=names, values=values) 

303 for name in names: 

304 var = self._inputs[name] 

305 self._variables_to_write.put(var) 

306 

307 def set_parameter_values( 

308 self, names: List[str], values: List[Union[float, int, bool]] 

309 ): 

310 """Sets parameter values in the model and in the FMU.""" 

311 super().set_parameter_values(names=names, values=values) 

312 for name in names: 

313 var = self._parameters[name] 

314 self._variables_to_write.put(var) 

315 

316 def __read_values(self): 

317 for _var in chain.from_iterable([self.outputs, self.parameters, self.states]): 

318 _vr = self._variables_vr[_var.name] 

319 if _var.type == "Real": 

320 _var.value = self.system.getReal([_vr])[0] 

321 elif _var.type in ["Integer", "Enumeration"]: 

322 _var.value = self.system.getInteger([_vr])[0] 

323 elif _var.type == "Boolean": 

324 _var.value = self.system.getBoolean([_vr])[0] 

325 else: 

326 raise TypeError( 

327 f"Unsupported type: {_var.type} for variable {_var.name}" 

328 ) 

329 

330 def __enter__(self): 

331 self._terminate_and_free_instance() 

332 

333 def __exit__(self, exc_type, exc_val, exc_tb): 

334 self._terminate_and_free_instance() 

335 # clean up 

336 shutil.rmtree(self._unzip_dir, ignore_errors=True) 

337 

338 def _terminate_and_free_instance(self): 

339 try: 

340 self.system.terminate() 

341 except Exception as err: 

342 logger.error("Could not terminate FMU instance %s", err) 

343 try: 

344 self.system.freeInstance() 

345 except Exception as err: 

346 logger.error("Could not terminate FMU instance %s", err) 

347 

348 def terminate(self): 

349 """Overwrite base method""" 

350 self.__exit__(exc_type=None, exc_val=None, exc_tb=None) 

351 

352 @staticmethod 

353 def _converter(type_of_var: str, value): 

354 _mapper = {"Boolean": bool, "Real": float, "Integer": int, "Enumeration": int} 

355 if type_of_var in _mapper: 

356 return _mapper[type_of_var](value) 

357 # else 

358 return value