Coverage for agentlib/models/fmu_model.py: 33%

165 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2025-12-23 08:15 +0000

1"""This module contains the FMUModel class.""" 

2 

3import queue 

4import shutil 

5import os 

6import logging 

7import pathlib 

8import uuid 

9from itertools import chain 

10from typing import Union, List 

11 

12import attrs 

13import pydantic 

14from pydantic import field_validator, FilePath 

15 

16from agentlib.utils import create_time_samples 

17from agentlib.core import Model, ModelConfig 

18from agentlib.core.errors import OptionalDependencyError 

19from agentlib.core.datamodels import ModelVariable, Causality 

20 

21try: 

22 import fmpy.fmi2 

23 import fmpy 

24 from fmpy.fmi1 import FMICallException 

25except ImportError as err: 

26 raise OptionalDependencyError( 

27 dependency_name="fmu", dependency_install="fmpy", used_object="FMU-model" 

28 ) from err 

29 

30logger = logging.getLogger(__name__) 

31 

32 

33class FmuModelConfig(ModelConfig): 

34 """ 

35 The Config of FMUModels overwrite the default 

36 ModelConfig to redefine the system and add relevant 

37 fields like path and tolerance of the simulation. 

38 """ 

39 

40 path: FilePath 

41 tolerance: float = 0.001 

42 extract_fmu: bool = False 

43 log_fmu: bool = True 

44 only_config_variables: bool = pydantic.Field( 

45 default=True, 

46 description="If True, only the variables passed to this model by a simulator " 

47 "will be read and written at each simulation step (specified by " 

48 "dt).", 

49 ) 

50 

51 @field_validator("path") 

52 @classmethod 

53 def check_path(cls, path): 

54 """Check if the path has the correct extension""" 

55 # check file extension 

56 assert path.suffix in [".fmu", ".mo"], "Unknown file-extension" 

57 return path 

58 

59 

60class FmuModel(Model): 

61 """Class to wrap any FMU Model into the Model-Standard 

62 of the agentlib. 

63 """ 

64 

65 config: FmuModelConfig 

66 

67 def __init__(self, **kwargs): 

68 # Private map to link variable names (str) to the fmu value reference (int) 

69 self._variables_vr = {} 

70 self._unzip_dir = None 

71 # Initialize config 

72 super().__init__(**kwargs) 

73 # Actively set the path to trigger the automatic setup (evokes __init_fmu) 

74 self.system = self.__init_fmu() 

75 initial_write_values = self.inputs + self.parameters 

76 self._variables_to_write = queue.Queue() 

77 [self._variables_to_write.put(v) for v in initial_write_values] 

78 

79 @property 

80 def tolerance(self): 

81 """Get the tolerance of FMU simulation""" 

82 return self.config.tolerance 

83 

84 @tolerance.setter 

85 def tolerance(self, tolerance: float): 

86 """Set the tolerance in the config.""" 

87 self.config.tolerance = tolerance 

88 

89 @tolerance.deleter 

90 def tolerance(self): 

91 """Delete the tolerance and restore the default value.""" 

92 self.config.tolerance = self.get_config_type().tolerance 

93 

94 @property 

95 def extract_fmu(self): 

96 """Get whether the fmu shall be extracted to a new 

97 directory or if the temp folder is used.""" 

98 return self.config.extract_fmu 

99 

100 def do_step(self, *, t_start, t_sample=None): 

101 if t_sample is None: 

102 t_sample = self.dt 

103 # Write current values to system 

104 while not self._variables_to_write.empty(): 

105 self.__write_value(self._variables_to_write.get_nowait()) 

106 t_samples = create_time_samples(t_end=t_sample, dt=self.dt) + t_start 

107 try: 

108 for _idx, _t_sample in enumerate(t_samples[:-1]): 

109 # do step 

110 self.system.doStep( 

111 currentCommunicationPoint=_t_sample, 

112 communicationStepSize=t_samples[_idx + 1] - _t_sample, 

113 ) 

114 except FMICallException as e: 

115 # Raise a different error, as simpy does not work well with FMI Errors 

116 raise RuntimeError( 

117 "The fmu had an internal error. Please check the logs to analyze it." 

118 ) from e 

119 # Read current values from system 

120 self.__read_values() 

121 return True 

122 

123 def initialize(self, **kwargs): 

124 """ 

125 Initializes FMU model 

126 

127 Required kwargs: 

128 t_start (float): Start time of simulation 

129 t_stop (float): Stop time of simulation 

130 """ 

131 logger.info("Initializing model...") 

132 # Handle Logging of the FMU itself: 

133 try: 

134 callbacks = fmpy.fmi2.fmi2CallbackFunctions() 

135 callbacks.logger = fmpy.fmi2.fmi2CallbackLoggerTYPE(self._fmu_logger) 

136 callbacks.allocateMemory = fmpy.fmi2.fmi2CallbackAllocateMemoryTYPE( 

137 fmpy.calloc 

138 ) 

139 callbacks.freeMemory = fmpy.fmi2.fmi2CallbackFreeMemoryTYPE(fmpy.free) 

140 from fmpy.logging import addLoggerProxy 

141 from ctypes import byref 

142 

143 addLoggerProxy(byref(callbacks)) 

144 except Exception as err: 

145 logger.error("Could not setup custom logger in FMU model: %s", err) 

146 callbacks = None 

147 

148 self.system.instantiate( 

149 callbacks=callbacks, 

150 loggingOn=False, # Only for debug of fmu itself 

151 visible=False, # Only for debug of fmu itself 

152 ) 

153 self.system.reset() 

154 self.system.setupExperiment( 

155 startTime=kwargs["t_start"], 

156 stopTime=kwargs["t_stop"], 

157 tolerance=self.tolerance, 

158 ) 

159 self.system.enterInitializationMode() 

160 self.system.exitInitializationMode() 

161 logger.info("Model: %s initialized", self.name) 

162 

163 def _fmu_logger(self, component, instanceName, status, category, message): 

164 """Print the FMU's log messages to the command line (works for both FMI 1.0 and 2.0)""" 

165 # pylint: disable=unused-argument, invalid-name 

166 if self.config.log_fmu: 

167 label = ["OK", "WARNING", "DISCARD", "ERROR", "FATAL", "PENDING"][status] 

168 _level_map = { 

169 "OK": logging.INFO, 

170 "WARNING": logging.WARNING, 

171 "DISCARD": logging.WARNING, 

172 "ERROR": logging.ERROR, 

173 "FATAL": logging.FATAL, 

174 "PENDING": logging.FATAL, 

175 } 

176 logger.log(level=_level_map[label], msg=message.decode("utf-8")) 

177 

178 def __init_fmu(self) -> fmpy.fmi2.FMU2Slave: 

179 path = self.config.path 

180 # System setup: 

181 # extract the FMU 

182 if self.extract_fmu: 

183 # Create own unzip directory 

184 _path = pathlib.Path(path) 

185 if not _path.is_absolute(): 

186 _path = pathlib.Path(os.getcwd()).joinpath(_path) 

187 _unzip_dir = _path.parents[0].joinpath( 

188 f'{_path.name.replace(".fmu", "")}' f"_extracted_{uuid.uuid4()}" 

189 ) 

190 _unzip_dir = str(_unzip_dir) 

191 else: 

192 _unzip_dir = None 

193 cur_cwd = os.getcwd() 

194 self._unzip_dir = fmpy.extract(filename=path, unzipdir=_unzip_dir) 

195 os.chdir(cur_cwd) # Reset cwd. fmpy changes it sometimes. 

196 # Read the model description 

197 _model_description = fmpy.read_model_description(self._unzip_dir, validate=True) 

198 _system = fmpy.fmi2.FMU2Slave( 

199 guid=_model_description.guid, 

200 unzipDirectory=self._unzip_dir, 

201 modelIdentifier=_model_description.coSimulation.modelIdentifier, 

202 instanceName=__name__, 

203 fmiCallLogger=None, 

204 ) 

205 self.name = _model_description.modelName 

206 if _model_description.description is not None: 

207 self.description = _model_description.description 

208 

209 # Variable setup: 

210 # Get the inputs, outputs, internals and parameters 

211 self._variables_vr = {} 

212 _vars = { 

213 Causality.input: [], 

214 Causality.parameter: [], 

215 Causality.calculatedParameter: [], 

216 Causality.output: [], 

217 Causality.local: [], 

218 Causality.independent: [], 

219 } 

220 config_vars = set(self.config.get_variable_names()) 

221 for _model_var in _model_description.modelVariables: 

222 # Convert to an agentlib ModelVariable object 

223 if _model_var.type == "String": 

224 logger.warning( 

225 "String variable %s omitted. Not supported in AgentLib.", 

226 _model_var.name, 

227 ) 

228 continue # Don't allow string model variables 

229 

230 # if desired, we skip adding variables to this model instance if they are 

231 # not specified from outside. They will remain within the 

232 # fmpy.fmi2.FMU2Slave instance 

233 if self.config.only_config_variables and _model_var.name not in config_vars: 

234 continue 

235 

236 _vars[_model_var.causality].append( 

237 dict( 

238 name=_model_var.name, 

239 type=_model_var.type, 

240 value=( 

241 self._converter(_model_var.type, _model_var.start) 

242 if ( 

243 _model_var.causality 

244 in [ 

245 Causality.parameter, 

246 Causality.calculatedParameter, 

247 Causality.input, 

248 ] 

249 and _model_var.start is not None 

250 ) 

251 else None 

252 ), 

253 unit=( 

254 _model_var.unit 

255 if _model_var.unit is not None 

256 else attrs.fields(ModelVariable).unit.default 

257 ), 

258 description=( 

259 _model_var.description 

260 if _model_var.description is not None 

261 else attrs.fields(ModelVariable).description.default 

262 ), 

263 causality=_model_var.causality, 

264 variability=_model_var.variability, 

265 ) 

266 ) 

267 self._variables_vr.update({_model_var.name: _model_var.valueReference}) 

268 # This sets the inputs, outputs, internals and parameters and variables 

269 self.config = { 

270 "inputs": _vars[Causality.input], 

271 "outputs": _vars[Causality.output], 

272 "parameters": _vars[Causality.parameter] 

273 + _vars[Causality.calculatedParameter], 

274 "states": _vars[Causality.local] + _vars[Causality.independent], 

275 **self.config.model_dump( 

276 exclude={"inputs", "outputs", "states", "parameters"} 

277 ), 

278 } 

279 return _system 

280 

281 def __write_value(self, var: ModelVariable): 

282 # One can only set inputs and parameters! 

283 

284 if var.value is None: 

285 logger.error( 

286 "Tried setting the value of %s to None. This will not be set in the " 

287 "FMU.", 

288 var.name, 

289 ) 

290 return 

291 

292 _vr = self._variables_vr[var.name] 

293 if var.type == "Real": 

294 self.system.setReal([_vr], [float(var.value)]) 

295 elif var.type in ["Integer", "Enumeration"]: 

296 self.system.setInteger([_vr], [int(var.value)]) 

297 elif var.type == "Boolean": 

298 self.system.setBoolean([_vr], [bool(var.value)]) 

299 else: 

300 logger.error("Variable %s not valid for this model!", var.name) 

301 

302 def set_input_values(self, names: List[str], values: List[Union[float, int, bool]]): 

303 """Sets input values in the model and in the FMU.""" 

304 super().set_input_values(names=names, values=values) 

305 for name in names: 

306 var = self._inputs[name] 

307 self._variables_to_write.put(var) 

308 

309 def set_parameter_values( 

310 self, names: List[str], values: List[Union[float, int, bool]] 

311 ): 

312 """Sets parameter values in the model and in the FMU.""" 

313 super().set_parameter_values(names=names, values=values) 

314 for name in names: 

315 var = self._parameters[name] 

316 self._variables_to_write.put(var) 

317 

318 def __read_values(self): 

319 for _var in chain.from_iterable([self.outputs, self.parameters, self.states]): 

320 _vr = self._variables_vr[_var.name] 

321 if _var.type == "Real": 

322 _var.value = self.system.getReal([_vr])[0] 

323 elif _var.type in ["Integer", "Enumeration"]: 

324 _var.value = self.system.getInteger([_vr])[0] 

325 elif _var.type == "Boolean": 

326 _var.value = self.system.getBoolean([_vr])[0] 

327 else: 

328 raise TypeError( 

329 f"Unsupported type: {_var.type} for variable {_var.name}" 

330 ) 

331 

332 def __enter__(self): 

333 self._terminate_and_free_instance() 

334 

335 def __exit__(self, exc_type, exc_val, exc_tb): 

336 self._terminate_and_free_instance() 

337 # clean up 

338 shutil.rmtree(self._unzip_dir, ignore_errors=True) 

339 

340 def _terminate_and_free_instance(self): 

341 try: 

342 self.system.terminate() 

343 except Exception as err: 

344 logger.error("Could not terminate FMU instance %s", err) 

345 try: 

346 self.system.freeInstance() 

347 except Exception as err: 

348 logger.error("Could not terminate FMU instance %s", err) 

349 

350 def terminate(self): 

351 """Overwrite base method""" 

352 self.__exit__(exc_type=None, exc_val=None, exc_tb=None) 

353 

354 @staticmethod 

355 def _converter(type_of_var: str, value): 

356 _mapper = {"Boolean": bool, "Real": float, "Integer": int, "Enumeration": int} 

357 if type_of_var in _mapper: 

358 return _mapper[type_of_var](value) 

359 # else 

360 return value