Coverage for ebcpy/simulationapi/__init__.py: 91%

293 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2025-08-26 09:12 +0000

1""" 

2Simulation APIs help you to perform automated 

3simulations for energy and building climate related models. 

4Parameters can easily be updated, and the initialization-process is 

5much more user-friendly than the provided APIs by Dymola or fmpy. 

6""" 

7import pathlib 

8import warnings 

9import os 

10import sys 

11import itertools 

12import time 

13from pathlib import Path 

14from datetime import timedelta 

15from typing import Dict, Union, TypeVar, Any, List 

16from abc import abstractmethod 

17import multiprocessing as mp 

18 

19import pydantic 

20from pydantic import BaseModel, Field, field_validator 

21import numpy as np 

22from ebcpy.utils import setup_logger 

23from ebcpy.utils.reproduction import save_reproduction_archive 

24from shutil import disk_usage 

25 

26 

27class Variable(BaseModel): 

28 """ 

29 Data-Class to store relevant information for a 

30 simulation variable (input, parameter, output or local/state). 

31 """ 

32 type: Any = Field( 

33 default=None, 

34 title='type', 

35 description='Type of the variable' 

36 ) 

37 value: Any = Field( 

38 description="Default variable value" 

39 ) 

40 max: Any = Field( 

41 default=None, 

42 title='max', 

43 description='Maximal value (upper bound) of the variables value. ' 

44 'Only for ints and floats variables.' 

45 ) 

46 min: Any = Field( 

47 default=None, 

48 title='min', 

49 description='Minimal value (lower bound) of the variables value. ' 

50 'Only for ints and floats variables.' 

51 ) 

52 

53 @field_validator("value") 

54 @classmethod 

55 def check_value_type(cls, value, info: pydantic.FieldValidationInfo): 

56 """Check if the given value has correct type""" 

57 _type = info.data["type"] 

58 if _type is None: 

59 return value # No type -> no conversion 

60 if value is None: 

61 return value # Setting None is allowed. 

62 if not isinstance(value, _type): 

63 return _type(value) 

64 return value 

65 

66 @field_validator('max', 'min') 

67 @classmethod 

68 def check_value(cls, value, info: pydantic.FieldValidationInfo): 

69 """Check if the given bounds are correct.""" 

70 # Check if the variable type even allows for min/max bounds 

71 _type = info.data["type"] 

72 if _type is None: 

73 return value # No type -> no conversion 

74 if _type not in (float, int, bool): 

75 if value is not None: 

76 warnings.warn( 

77 "Setting a min/max for variables " 

78 f"of type {_type} is not supported." 

79 ) 

80 return None 

81 if value is not None: 

82 return _type(value) 

83 if info.field_name == "min": 

84 return -np.inf if _type != bool else False 

85 # else it is max 

86 return np.inf if _type != bool else True 

87 

88 

89class SimulationSetup(BaseModel): 

90 """ 

91 pydantic BaseModel child to define relevant 

92 parameters to setup the simulation. 

93 """ 

94 start_time: float = Field( 

95 default=0, 

96 description="The start time of the simulation", 

97 title="start_time" 

98 ) 

99 stop_time: float = Field( 

100 default=1, 

101 description="The stop / end time of the simulation", 

102 title="stop_time" 

103 ) 

104 output_interval: float = Field( 

105 default=1, 

106 description="The step size of the simulation and " 

107 "thus also output interval of results.", 

108 title="output_interval" 

109 ) 

110 fixedstepsize: float = Field( 

111 title="fixedstepsize", 

112 default=0.0, 

113 description="Fixed step size for Euler" 

114 ) 

115 solver: str = Field( 

116 title="solver", 

117 default="", # Is added in the field_validator 

118 description="The solver to be used for numerical integration." 

119 ) 

120 _default_solver: str = None 

121 _allowed_solvers: list = [] 

122 

123 @field_validator("solver") 

124 @classmethod 

125 def check_valid_solver(cls, solver): 

126 """ 

127 Check if the solver is in the list of valid solvers 

128 """ 

129 if not solver: 

130 return cls.__private_attributes__['_default_solver'].default 

131 allowed_solvers = cls.__private_attributes__['_allowed_solvers'].default 

132 if solver not in allowed_solvers: 

133 raise ValueError(f"Given solver '{solver}' is not supported! " 

134 f"Supported are '{allowed_solvers}'") 

135 return solver 

136 

137 class Config: 

138 """Overwrite default pydantic Config""" 

139 extra = 'forbid' 

140 

141 

142SimulationSetupClass = TypeVar("SimulationSetupClass", bound=SimulationSetup) 

143 

144 

145class SimulationAPI: 

146 """Base-class for simulation apis. Every simulation-api class 

147 must inherit from this class. It defines the structure of each class. 

148 

149 :param str,Path working_directory: 

150 Working directory path 

151 :param str model_name: 

152 Name of the model being simulated. 

153 :keyword int n_cpu: 

154 Number of cores to be used by simulation. 

155 If None is given, single core will be used. 

156 Maximum number equals the cpu count of the device. 

157 **Warning**: Logging is not yet fully working on multiple processes. 

158 Output will be written to the stream handler, but not to the created 

159 .log files. 

160 :keyword bool save_logs: If logs should be stored. 

161 

162 """ 

163 _sim_setup_class: SimulationSetupClass = SimulationSetup 

164 _items_to_drop = [ 

165 'pool', 

166 ] 

167 

168 def __init__(self, working_directory: Union[Path, str], model_name: str, 

169 **kwargs): 

170 # Private helper attrs for multiprocessing 

171 self._n_sim_counter = 0 

172 self._n_sim_total = 0 

173 self._progress_int = 0 

174 # Handle deprecation warning 

175 self._working_directory = None # Define instance attribute 

176 self.working_directory = Path(working_directory).absolute() 

177 save_logs = kwargs.get("save_logs", True) 

178 self.logger = setup_logger( 

179 working_directory=self.working_directory if save_logs else None, 

180 name=self.__class__.__name__ 

181 ) 

182 # Setup the logger 

183 self.logger.info(f'{"-" * 25}Initializing class {self.__class__.__name__}{"-" * 25}') 

184 # Check multiprocessing 

185 self.n_cpu = kwargs.get("n_cpu", 1) 

186 if self.n_cpu > mp.cpu_count(): 

187 raise ValueError(f"Given n_cpu '{self.n_cpu}' is greater " 

188 "than the available number of " 

189 f"cpus on your machine '{mp.cpu_count()}'") 

190 if self.n_cpu > 1: 

191 # pylint: disable=consider-using-with 

192 self.pool = mp.Pool(processes=self.n_cpu) 

193 self.use_mp = True 

194 else: 

195 self.pool = None 

196 self.use_mp = False 

197 # Setup the model 

198 self._sim_setup = self._sim_setup_class() 

199 self.inputs: Dict[str, Variable] = {} # Inputs of model 

200 self.outputs: Dict[str, Variable] = {} # Outputs of model 

201 self.parameters: Dict[str, Variable] = {} # Parameter of model 

202 self.states: Dict[str, Variable] = {} # States of model 

203 self.result_names = [] 

204 self._model_name = None 

205 self.model_name = model_name 

206 

207 # MP-Functions 

208 @property 

209 def worker_idx(self): 

210 """Index of the current worker""" 

211 _id = mp.current_process()._identity 

212 if _id: 

213 return _id[0] 

214 return None 

215 

216 def _get_worker_directory(self, use_mp: bool): 

217 """ 

218 Returns the current working directory for the process / worker. 

219 If this function is called from the main process, we 

220 always return the main working_directory. 

221 

222 :param bool use_mp: Indicates if the central working directory is needed or the worker one. 

223 """ 

224 if use_mp: 

225 worker_idx = self.worker_idx 

226 if worker_idx is None: 

227 return self.working_directory # This function is called outside of multiprocessing 

228 return self.working_directory.joinpath(f"worker_{worker_idx}") 

229 return self.working_directory 

230 

231 def __getstate__(self): 

232 """Overwrite magic method to allow pickling the api object""" 

233 self_dict = self.__dict__.copy() 

234 for item in self._items_to_drop: 

235 del self_dict[item] 

236 return self_dict 

237 

238 def __setstate__(self, state): 

239 """Overwrite magic method to allow pickling the api object""" 

240 self.__dict__.update(state) 

241 

242 def close(self): 

243 """Base function for closing the simulation-program.""" 

244 if self.use_mp: 

245 try: 

246 self.pool.map(self._close_multiprocessing, 

247 list(range(self.n_cpu))) 

248 self.pool.close() 

249 self.pool.join() 

250 except ValueError: 

251 pass # Already closed prior to atexit 

252 

253 @abstractmethod 

254 def _close_multiprocessing(self, _): 

255 raise NotImplementedError(f'{self.__class__.__name__}.close ' 

256 f'function is not defined') 

257 

258 @abstractmethod 

259 def _single_close(self, **kwargs): 

260 """Base function for closing the simulation-program of a single core""" 

261 raise NotImplementedError(f'{self.__class__.__name__}._single_close ' 

262 f'function is not defined') 

263 

264 @abstractmethod 

265 def simulate(self, 

266 parameters: Union[dict, List[dict]] = None, 

267 return_option: str = "time_series", 

268 **kwargs): 

269 """ 

270 Base function for simulating the simulation-model. 

271 

272 :param dict parameters: 

273 Parameters to simulate. 

274 Names of parameters are key, values are value of the dict. 

275 It is also possible to specify a list of multiple parameter 

276 dicts for different parameter variations to be simulated. 

277 Default is an empty dict. 

278 :param str return_option: 

279 How to handle the simulation results. Options are: 

280 - 'time_series': Returns a DataFrame with the results and does not store anything. 

281 Only variables specified in result_names will be returned. 

282 - 'last_point': Returns only the last point of the simulation. 

283 Relevant for integral metrics like energy consumption. 

284 Only variables specified in result_names will be returned. 

285 - 'savepath': Returns the savepath where the results are stored. 

286 Depending on the API, different kwargs may be used to specify file type etc. 

287 :keyword str,Path savepath: 

288 If path is provided, the relevant simulation results will be saved 

289 in the given directory. For multiple parameter variations also a list 

290 of savepaths for each parameterset can be specified. 

291 The savepaths for each parameter set must be unique. 

292 Only relevant if return_option equals 'savepath'. 

293 Default is the current working directory. 

294 :keyword str result_file_name: 

295 Name of the result file. Default is 'resultFile'. 

296 For multiple parameter variations a list of names 

297 for each result must be specified.  

298 Only relevant if return_option equals 'savepath'. 

299 :keyword (TimeSeriesData, pd.DataFrame) inputs: 

300 Pandas.Dataframe of the input data for simulating the FMU with fmpy 

301 :keyword Boolean fail_on_error: 

302 If True, an error in fmpy will trigger an error in this script. 

303 Default is True 

304 

305 :return: str,os.path.normpath filepath: 

306 Only if return_option equals 'savepath'. 

307 Filepath of the result file. 

308 :return: dict: 

309 Only if return_option equals 'last_point'. 

310 :return: Union[List[pd.DataFrame],pd.DataFrame]: 

311 If parameters are scalar and squeeze=True, 

312 a DataFrame with the columns being equal to 

313 self.result_names. 

314 If multiple set's of initial values are given, one 

315 dataframe for each set is returned in a list 

316 """ 

317 # Convert inputs to equally sized objects of lists: 

318 if parameters is None: 

319 parameters = [{}] 

320 if isinstance(parameters, dict): 

321 parameters = [parameters] 

322 

323 if return_option not in ["time_series", "savepath", "last_point"]: 

324 raise ValueError(f"Given return option '{return_option}' is not supported.") 

325 

326 new_kwargs = {} 

327 kwargs["return_option"] = return_option # Update with arg 

328 n_simulations = len(parameters) 

329 # Handle special case for saving files: 

330 if return_option == "savepath" and n_simulations > 1: 

331 savepath = kwargs.get("savepath", self.working_directory) 

332 if isinstance(savepath, (str, os.PathLike, Path)): 

333 savepath = [savepath] * n_simulations 

334 result_file_name = kwargs.get("result_file_name", []) 

335 if isinstance(result_file_name, str): 

336 result_file_name = [result_file_name] * n_simulations 

337 if len(savepath) != len(result_file_name): 

338 raise ValueError("Given savepath and result_file_name " 

339 "have not the same length.") 

340 joined_save_paths = [] 

341 for _single_save_path, _single_result_name in zip(savepath, result_file_name): 

342 joined_save_paths.append(os.path.join(_single_save_path, _single_result_name)) 

343 if len(set(joined_save_paths)) != n_simulations: 

344 raise ValueError( 

345 "Simulating multiple parameter set's on " 

346 "the same combination of savepath and result_file_name " 

347 "will override results or even cause errors. " 

348 "Specify a unique result_file_name-savepath combination " 

349 "for each parameter combination" 

350 ) 

351 for key, value in kwargs.items(): 

352 if isinstance(value, list): 

353 if len(value) != n_simulations: 

354 raise ValueError(f"Mismatch in multiprocessing of " 

355 f"given parameters ({n_simulations}) " 

356 f"and given {key} ({len(value)})") 

357 new_kwargs[key] = value 

358 else: 

359 new_kwargs[key] = [value] * n_simulations 

360 kwargs = [] 

361 for _idx, _parameters in enumerate(parameters): 

362 kwargs.append( 

363 {"parameters": _parameters, 

364 **{key: value[_idx] for key, value in new_kwargs.items()} 

365 } 

366 ) 

367 # Decide between mp and single core 

368 t_sim_start = time.time() 

369 if self.use_mp: 

370 self._n_sim_counter = 0 

371 self._n_sim_total = len(kwargs) 

372 self._progress_int = 0 

373 self.logger.info("Starting %s simulations on %s cores", 

374 self._n_sim_total, self.n_cpu) 

375 results = [] 

376 for result in self.pool.imap(self._single_simulation, kwargs): 

377 results.append(result) 

378 self._n_sim_counter += 1 

379 # Assuming that all worker start and finish their first simulation 

380 # at the same time, so that the time estimation begins after 

381 # n_cpu simulations. Otherwise, the translation and start process 

382 # could falsify the time estimation. 

383 if self._n_sim_counter == self.n_cpu: 

384 t1 = time.time() 

385 if self._n_sim_counter > self.n_cpu: 

386 self._remaining_time(t1) 

387 if self._n_sim_counter == 1 and return_option == 'savepath': 

388 self._check_disk_space(result) 

389 sys.stderr.write("\r") 

390 else: 

391 results = [self._single_simulation(kwargs={ 

392 "parameters": _single_kwargs["parameters"], 

393 "return_option": _single_kwargs["return_option"], 

394 **_single_kwargs 

395 }) for _single_kwargs in kwargs] 

396 self.logger.info(f"Finished {n_simulations} simulations on {self.n_cpu} processes in " 

397 f"{timedelta(seconds=int(time.time() - t_sim_start))}") 

398 if len(results) == 1: 

399 return results[0] 

400 return results 

401 

402 def _remaining_time(self, t1): 

403 """ 

404 Helper function to calculate the remaining simulation time and log the finished simulations. 

405 The function can first be used when a simulation has finished on each used cpu, so that the 

406 translation of the model is not considered in the time estimation. 

407 

408 :param float t1: 

409 Start time after n_cpu simulations. 

410 """ 

411 t_remaining = (time.time() - t1) / (self._n_sim_counter - self.n_cpu) * ( 

412 self._n_sim_total - self._n_sim_counter) 

413 p_finished = self._n_sim_counter / self._n_sim_total * 100 

414 sys.stderr.write(f"\rFinished {np.round(p_finished, 1)} %. " 

415 f"Approximately remaining time: {timedelta(seconds=int(t_remaining))} ") 

416 

417 def _check_disk_space(self, filepath): 

418 """ 

419 Checks how much disk space all simulations will need on a hard drive 

420 and throws a warning when less than 5 % would be free on the hard drive 

421 after all simulations. 

422 Works only for multiprocessing. 

423 """ 

424 

425 def convert_bytes(size): 

426 suffixes = ['B', 'KB', 'MB', 'GB', 'TB'] 

427 suffix_idx = 0 

428 while size >= 1024 and suffix_idx < len(suffixes): 

429 suffix_idx += 1 

430 size = size / 1024.0 

431 return f'{str(np.round(size, 2))} {suffixes[suffix_idx]}' 

432 

433 if not isinstance(filepath, (Path, str)) or not os.path.exists(filepath): 

434 self.logger.info( 

435 "Can't check disk usage as you probably used postprocessing on simulation " 

436 "results but did not return a file-path in the post-processing function" 

437 ) 

438 return 

439 

440 sim_file_size = os.stat(filepath).st_size 

441 sim_files_size = sim_file_size * self._n_sim_total 

442 self.logger.info(f"Simulations files need approximately {convert_bytes(sim_files_size)} of disk space") 

443 total, used, free = disk_usage(filepath) 

444 if sim_files_size > free - 0.05 * total: 

445 warnings.warn(f"{convert_bytes(free)} of free disk space on {filepath[:2]} " 

446 f"is not enough for all simulation files.") 

447 

448 @abstractmethod 

449 def _single_simulation(self, kwargs): 

450 """ 

451 Same arguments and function as simulate(). 

452 Used to differ between single- and multi-processing simulation""" 

453 raise NotImplementedError(f'{self.__class__.__name__}._single_simulation ' 

454 f'function is not defined') 

455 

456 @property 

457 def sim_setup(self) -> SimulationSetupClass: 

458 """Return current sim_setup""" 

459 return self._sim_setup 

460 

461 @sim_setup.deleter 

462 def sim_setup(self): 

463 """In case user deletes the object, reset it to the default one.""" 

464 self._sim_setup = self._sim_setup_class() 

465 

466 def set_sim_setup(self, sim_setup): 

467 """ 

468 Replaced in v0.1.7 by property function 

469 """ 

470 new_setup = self._sim_setup.model_dump() 

471 new_setup.update(sim_setup) 

472 self._sim_setup = self._sim_setup_class(**new_setup) 

473 

474 @property 

475 def model_name(self) -> str: 

476 """Name of the model being simulated""" 

477 return self._model_name 

478 

479 @model_name.setter 

480 def model_name(self, model_name: str): 

481 """ 

482 Set new model_name and trigger further functions 

483 to load parameters etc. 

484 """ 

485 # Only update if the model_name actually changes 

486 if self._model_name == model_name: 

487 return 

488 self._model_name = model_name 

489 # Only update model if it's the first setup. On multiprocessing, 

490 # all objects are duplicated and thus this setter is triggered again. 

491 # This if statement catches this case. 

492 if self.worker_idx and self.use_mp: 

493 return 

494 # Empty all variables again. 

495 self._update_model_variables() 

496 

497 def _update_model_variables(self): 

498 """ 

499 Function to empty all variables and update them again 

500 """ 

501 self.outputs = {} 

502 self.parameters = {} 

503 self.states = {} 

504 self.inputs = {} 

505 self._update_model() 

506 # Set all outputs to result_names: 

507 self.result_names = list(self.outputs.keys()) 

508 

509 @abstractmethod 

510 def _update_model(self): 

511 """ 

512 Reimplement this to change variables etc. 

513 based on the new model. 

514 """ 

515 raise NotImplementedError(f'{self.__class__.__name__}._update_model ' 

516 f'function is not defined') 

517 

518 def set_working_directory(self, working_directory: Union[Path, str]): 

519 """Base function for changing the current working directory.""" 

520 self.working_directory = working_directory 

521 

522 @property 

523 def working_directory(self) -> Path: 

524 """Get the current working directory""" 

525 return self._working_directory 

526 

527 @working_directory.setter 

528 def working_directory(self, working_directory: Union[Path, str]): 

529 """Set the current working directory""" 

530 if isinstance(working_directory, str): 

531 working_directory = Path(working_directory) 

532 os.makedirs(working_directory, exist_ok=True) 

533 self._working_directory = working_directory 

534 

535 def set_cd(self, cd: Union[Path, str]): 

536 warnings.warn("cd was renamed to working_directory in all classes. " 

537 "Use working_directory instead instead.", category=DeprecationWarning) 

538 self.working_directory = cd 

539 

540 @property 

541 def cd(self) -> Path: 

542 warnings.warn("cd was renamed to working_directory in all classes. " 

543 "Use working_directory instead instead.", category=DeprecationWarning) 

544 return self.working_directory 

545 

546 @cd.setter 

547 def cd(self, cd: Union[Path, str]): 

548 warnings.warn("cd was renamed to working_directory in all classes. " 

549 "Use working_directory instead instead.", category=DeprecationWarning) 

550 self.working_directory = cd 

551 

552 @property 

553 def result_names(self) -> List[str]: 

554 """ 

555 The variables names which to store in results. 

556 

557 Returns: 

558 list: List of string where the string is the 

559 name of the variable to store in the result. 

560 """ 

561 return self._result_names 

562 

563 @result_names.setter 

564 def result_names(self, result_names): 

565 """ 

566 Set the result names. If the name is not supported, 

567 an error is logged. 

568 """ 

569 self.check_unsupported_variables(variables=result_names, 

570 type_of_var="variables") 

571 self._result_names = result_names 

572 

573 @property 

574 def variables(self): 

575 """ 

576 All variables of the simulation model 

577 """ 

578 return list(itertools.chain(self.parameters.keys(), 

579 self.outputs.keys(), 

580 self.inputs.keys(), 

581 self.states.keys())) 

582 

583 def check_unsupported_variables(self, variables: List[str], type_of_var: str): 

584 """Log warnings if variables are not supported.""" 

585 # If no variables are given, the model is likely not translated yet and the check can't be done. 

586 if len(self.variables) == 0: 

587 return False 

588 

589 if type_of_var == "parameters": 

590 ref = self.parameters.keys() 

591 elif type_of_var == "outputs": 

592 ref = self.outputs.keys() 

593 elif type_of_var == "inputs": 

594 ref = self.inputs.keys() 

595 elif type_of_var == "inputs": 

596 ref = self.states.keys() 

597 else: 

598 ref = self.variables 

599 

600 diff = set(variables).difference(ref) 

601 if diff: 

602 self.logger.warning( 

603 "Variables '%s' not found in model '%s'. " 

604 "Will most probably trigger an error when simulating.", 

605 ', '.join(diff), self.model_name 

606 ) 

607 return True 

608 return False 

609 

610 @classmethod 

611 def get_simulation_setup_fields(cls): 

612 """Return all fields in the chosen SimulationSetup class.""" 

613 return list(cls._sim_setup_class.__fields__.keys()) 

614 

615 def save_for_reproduction(self, 

616 title: str, 

617 path: pathlib.Path = None, 

618 files: list = None, 

619 **kwargs): 

620 """ 

621 Save the settings of the SimulationAPI in order to 

622 reproduce the settings of the used simulation. 

623 

624 Should be extended by child-classes to allow custom 

625 saving. 

626 

627 :param str title: 

628 Title of the study 

629 :param pathlib.Path path: 

630 Where to store the .zip file. If not given, self.cd is used. 

631 :param list files: 

632 List of files to save along the standard ones. 

633 Examples would be plots, tables etc. 

634 :param dict kwargs: 

635 All keyword arguments except title, files, and path of the function 

636 `save_reproduction_archive`. Most importantly, `log_message` may be 

637 specified to avoid input during execution. 

638 """ 

639 if path is None: 

640 path = self.cd 

641 return save_reproduction_archive( 

642 title=title, 

643 path=path, 

644 files=files, 

645 **kwargs 

646 )