Coverage for ebcpy/simulationapi/__init__.py: 92%

283 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2025-01-04 08:02 +0000

1""" 

2Simulation APIs help you to perform automated 

3simulations for energy and building climate related models. 

4Parameters can easily be updated, and the initialization-process is 

5much more user-friendly than the provided APIs by Dymola or fmpy. 

6""" 

7import pathlib 

8import warnings 

9import os 

10import sys 

11import itertools 

12import time 

13from pathlib import Path 

14from datetime import timedelta 

15from typing import Dict, Union, TypeVar, Any, List 

16from abc import abstractmethod 

17import multiprocessing as mp 

18 

19import pydantic 

20from pydantic import BaseModel, Field, field_validator 

21import numpy as np 

22from ebcpy.utils import setup_logger 

23from ebcpy.utils.reproduction import save_reproduction_archive 

24from shutil import disk_usage 

25 

26 

27class Variable(BaseModel): 

28 """ 

29 Data-Class to store relevant information for a 

30 simulation variable (input, parameter, output or local/state). 

31 """ 

32 type: Any = Field( 

33 default=None, 

34 title='type', 

35 description='Type of the variable' 

36 ) 

37 value: Any = Field( 

38 description="Default variable value" 

39 ) 

40 max: Any = Field( 

41 default=None, 

42 title='max', 

43 description='Maximal value (upper bound) of the variables value. ' 

44 'Only for ints and floats variables.' 

45 ) 

46 min: Any = Field( 

47 default=None, 

48 title='min', 

49 description='Minimal value (lower bound) of the variables value. ' 

50 'Only for ints and floats variables.' 

51 ) 

52 

53 @field_validator("value") 

54 @classmethod 

55 def check_value_type(cls, value, info: pydantic.FieldValidationInfo): 

56 """Check if the given value has correct type""" 

57 _type = info.data["type"] 

58 if _type is None: 

59 return value # No type -> no conversion 

60 if value is None: 

61 return value # Setting None is allowed. 

62 if not isinstance(value, _type): 

63 return _type(value) 

64 return value 

65 

66 @field_validator('max', 'min') 

67 @classmethod 

68 def check_value(cls, value, info: pydantic.FieldValidationInfo): 

69 """Check if the given bounds are correct.""" 

70 # Check if the variable type even allows for min/max bounds 

71 _type = info.data["type"] 

72 if _type is None: 

73 return value # No type -> no conversion 

74 if _type not in (float, int, bool): 

75 if value is not None: 

76 warnings.warn( 

77 "Setting a min/max for variables " 

78 f"of type {_type} is not supported." 

79 ) 

80 return None 

81 if value is not None: 

82 return _type(value) 

83 if info.field_name == "min": 

84 return -np.inf if _type != bool else False 

85 # else it is max 

86 return np.inf if _type != bool else True 

87 

88 

89class SimulationSetup(BaseModel): 

90 """ 

91 pydantic BaseModel child to define relevant 

92 parameters to setup the simulation. 

93 """ 

94 start_time: float = Field( 

95 default=0, 

96 description="The start time of the simulation", 

97 title="start_time" 

98 ) 

99 stop_time: float = Field( 

100 default=1, 

101 description="The stop / end time of the simulation", 

102 title="stop_time" 

103 ) 

104 output_interval: float = Field( 

105 default=1, 

106 description="The step size of the simulation and " 

107 "thus also output interval of results.", 

108 title="output_interval" 

109 ) 

110 fixedstepsize: float = Field( 

111 title="fixedstepsize", 

112 default=0.0, 

113 description="Fixed step size for Euler" 

114 ) 

115 solver: str = Field( 

116 title="solver", 

117 default="", # Is added in the field_validator 

118 description="The solver to be used for numerical integration." 

119 ) 

120 _default_solver: str = None 

121 _allowed_solvers: list = [] 

122 

123 @field_validator("solver") 

124 @classmethod 

125 def check_valid_solver(cls, solver): 

126 """ 

127 Check if the solver is in the list of valid solvers 

128 """ 

129 if not solver: 

130 return cls.__private_attributes__['_default_solver'].default 

131 allowed_solvers = cls.__private_attributes__['_allowed_solvers'].default 

132 if solver not in allowed_solvers: 

133 raise ValueError(f"Given solver '{solver}' is not supported! " 

134 f"Supported are '{allowed_solvers}'") 

135 return solver 

136 

137 class Config: 

138 """Overwrite default pydantic Config""" 

139 extra = 'forbid' 

140 

141 

142SimulationSetupClass = TypeVar("SimulationSetupClass", bound=SimulationSetup) 

143 

144 

145class SimulationAPI: 

146 """Base-class for simulation apis. Every simulation-api class 

147 must inherit from this class. It defines the structure of each class. 

148 

149 :param str,Path working_directory: 

150 Working directory path 

151 :param str model_name: 

152 Name of the model being simulated. 

153 :keyword int n_cpu: 

154 Number of cores to be used by simulation. 

155 If None is given, single core will be used. 

156 Maximum number equals the cpu count of the device. 

157 **Warning**: Logging is not yet fully working on multiple processes. 

158 Output will be written to the stream handler, but not to the created 

159 .log files. 

160 :keyword bool save_logs: If logs should be stored. 

161 

162 """ 

163 _sim_setup_class: SimulationSetupClass = SimulationSetup 

164 _items_to_drop = [ 

165 'pool', 

166 ] 

167 

168 def __init__(self, working_directory: Union[Path, str], model_name: str, 

169 **kwargs): 

170 # Private helper attrs for multiprocessing 

171 self._n_sim_counter = 0 

172 self._n_sim_total = 0 

173 self._progress_int = 0 

174 # Handle deprecation warning 

175 self.working_directory = working_directory 

176 save_logs = kwargs.get("save_logs", True) 

177 self.logger = setup_logger( 

178 working_directory=self.working_directory if save_logs else None, 

179 name=self.__class__.__name__ 

180 ) 

181 # Setup the logger 

182 self.logger.info(f'{"-" * 25}Initializing class {self.__class__.__name__}{"-" * 25}') 

183 # Check multiprocessing 

184 self.n_cpu = kwargs.get("n_cpu", 1) 

185 if self.n_cpu > mp.cpu_count(): 

186 raise ValueError(f"Given n_cpu '{self.n_cpu}' is greater " 

187 "than the available number of " 

188 f"cpus on your machine '{mp.cpu_count()}'") 

189 if self.n_cpu > 1: 

190 # pylint: disable=consider-using-with 

191 self.pool = mp.Pool(processes=self.n_cpu) 

192 self.use_mp = True 

193 else: 

194 self.pool = None 

195 self.use_mp = False 

196 # Setup the model 

197 self._sim_setup = self._sim_setup_class() 

198 self.inputs: Dict[str, Variable] = {} # Inputs of model 

199 self.outputs: Dict[str, Variable] = {} # Outputs of model 

200 self.parameters: Dict[str, Variable] = {} # Parameter of model 

201 self.states: Dict[str, Variable] = {} # States of model 

202 self.result_names = [] 

203 self._model_name = None 

204 self.model_name = model_name 

205 

206 # MP-Functions 

207 @property 

208 def worker_idx(self): 

209 """Index of the current worker""" 

210 _id = mp.current_process()._identity 

211 if _id: 

212 return _id[0] 

213 return None 

214 

215 def __getstate__(self): 

216 """Overwrite magic method to allow pickling the api object""" 

217 self_dict = self.__dict__.copy() 

218 for item in self._items_to_drop: 

219 del self_dict[item] 

220 return self_dict 

221 

222 def __setstate__(self, state): 

223 """Overwrite magic method to allow pickling the api object""" 

224 self.__dict__.update(state) 

225 

226 def close(self): 

227 """Base function for closing the simulation-program.""" 

228 if self.use_mp: 

229 try: 

230 self.pool.map(self._close_multiprocessing, 

231 list(range(self.n_cpu))) 

232 self.pool.close() 

233 self.pool.join() 

234 except ValueError: 

235 pass # Already closed prior to atexit 

236 

237 @abstractmethod 

238 def _close_multiprocessing(self, _): 

239 raise NotImplementedError(f'{self.__class__.__name__}.close ' 

240 f'function is not defined') 

241 

242 @abstractmethod 

243 def _single_close(self, **kwargs): 

244 """Base function for closing the simulation-program of a single core""" 

245 raise NotImplementedError(f'{self.__class__.__name__}._single_close ' 

246 f'function is not defined') 

247 

248 @abstractmethod 

249 def simulate(self, 

250 parameters: Union[dict, List[dict]] = None, 

251 return_option: str = "time_series", 

252 **kwargs): 

253 """ 

254 Base function for simulating the simulation-model. 

255 

256 :param dict parameters: 

257 Parameters to simulate. 

258 Names of parameters are key, values are value of the dict. 

259 It is also possible to specify a list of multiple parameter 

260 dicts for different parameter variations to be simulated. 

261 Default is an empty dict. 

262 :param str return_option: 

263 How to handle the simulation results. Options are: 

264 - 'time_series': Returns a DataFrame with the results and does not store anything. 

265 Only variables specified in result_names will be returned. 

266 - 'last_point': Returns only the last point of the simulation. 

267 Relevant for integral metrics like energy consumption. 

268 Only variables specified in result_names will be returned. 

269 - 'savepath': Returns the savepath where the results are stored. 

270 Depending on the API, different kwargs may be used to specify file type etc. 

271 :keyword str,Path savepath: 

272 If path is provided, the relevant simulation results will be saved 

273 in the given directory. For multiple parameter variations also a list 

274 of savepaths for each parameterset can be specified. 

275 The savepaths for each parameter set must be unique. 

276 Only relevant if return_option equals 'savepath'. 

277 Default is the current working directory. 

278 :keyword str result_file_name: 

279 Name of the result file. Default is 'resultFile'. 

280 For multiple parameter variations a list of names 

281 for each result must be specified.  

282 Only relevant if return_option equals 'savepath'. 

283 :keyword (TimeSeriesData, pd.DataFrame) inputs: 

284 Pandas.Dataframe of the input data for simulating the FMU with fmpy 

285 :keyword Boolean fail_on_error: 

286 If True, an error in fmpy will trigger an error in this script. 

287 Default is True 

288 

289 :return: str,os.path.normpath filepath: 

290 Only if return_option equals 'savepath'. 

291 Filepath of the result file. 

292 :return: dict: 

293 Only if return_option equals 'last_point'. 

294 :return: Union[List[pd.DataFrame],pd.DataFrame]: 

295 If parameters are scalar and squeeze=True, 

296 a DataFrame with the columns being equal to 

297 self.result_names. 

298 If multiple set's of initial values are given, one 

299 dataframe for each set is returned in a list 

300 """ 

301 # Convert inputs to equally sized objects of lists: 

302 if parameters is None: 

303 parameters = [{}] 

304 if isinstance(parameters, dict): 

305 parameters = [parameters] 

306 

307 if return_option not in ["time_series", "savepath", "last_point"]: 

308 raise ValueError(f"Given return option '{return_option}' is not supported.") 

309 

310 new_kwargs = {} 

311 kwargs["return_option"] = return_option # Update with arg 

312 n_simulations = len(parameters) 

313 # Handle special case for saving files: 

314 if return_option == "savepath" and n_simulations > 1: 

315 savepath = kwargs.get("savepath", self.working_directory) 

316 if isinstance(savepath, (str, os.PathLike, Path)): 

317 savepath = [savepath] * n_simulations 

318 result_file_name = kwargs.get("result_file_name", []) 

319 if isinstance(result_file_name, str): 

320 result_file_name = [result_file_name] * n_simulations 

321 if len(savepath) != len(result_file_name): 

322 raise ValueError("Given savepath and result_file_name " 

323 "have not the same length.") 

324 joined_save_paths = [] 

325 for _single_save_path, _single_result_name in zip(savepath, result_file_name): 

326 joined_save_paths.append(os.path.join(_single_save_path, _single_result_name)) 

327 if len(set(joined_save_paths)) != n_simulations: 

328 raise ValueError( 

329 "Simulating multiple parameter set's on " 

330 "the same combination of savepath and result_file_name " 

331 "will override results or even cause errors. " 

332 "Specify a unique result_file_name-savepath combination " 

333 "for each parameter combination" 

334 ) 

335 for key, value in kwargs.items(): 

336 if isinstance(value, list): 

337 if len(value) != n_simulations: 

338 raise ValueError(f"Mismatch in multiprocessing of " 

339 f"given parameters ({n_simulations}) " 

340 f"and given {key} ({len(value)})") 

341 new_kwargs[key] = value 

342 else: 

343 new_kwargs[key] = [value] * n_simulations 

344 kwargs = [] 

345 for _idx, _parameters in enumerate(parameters): 

346 kwargs.append( 

347 {"parameters": _parameters, 

348 **{key: value[_idx] for key, value in new_kwargs.items()} 

349 } 

350 ) 

351 # Decide between mp and single core 

352 t_sim_start = time.time() 

353 if self.use_mp: 

354 self._n_sim_counter = 0 

355 self._n_sim_total = len(kwargs) 

356 self._progress_int = 0 

357 self.logger.info("Starting %s simulations on %s cores", 

358 self._n_sim_total, self.n_cpu) 

359 results = [] 

360 for result in self.pool.imap(self._single_simulation, kwargs): 

361 results.append(result) 

362 self._n_sim_counter += 1 

363 # Assuming that all worker start and finish their first simulation 

364 # at the same time, so that the time estimation begins after 

365 # n_cpu simulations. Otherwise, the translation and start process 

366 # could falsify the time estimation. 

367 if self._n_sim_counter == self.n_cpu: 

368 t1 = time.time() 

369 if self._n_sim_counter > self.n_cpu: 

370 self._remaining_time(t1) 

371 if self._n_sim_counter == 1 and return_option == 'savepath': 

372 self._check_disk_space(result) 

373 sys.stderr.write("\r") 

374 else: 

375 results = [self._single_simulation(kwargs={ 

376 "parameters": _single_kwargs["parameters"], 

377 "return_option": _single_kwargs["return_option"], 

378 **_single_kwargs 

379 }) for _single_kwargs in kwargs] 

380 self.logger.info(f"Finished {n_simulations} simulations on {self.n_cpu} processes in " 

381 f"{timedelta(seconds=int(time.time() - t_sim_start))}") 

382 if len(results) == 1: 

383 return results[0] 

384 return results 

385 

386 def _remaining_time(self, t1): 

387 """ 

388 Helper function to calculate the remaining simulation time and log the finished simulations. 

389 The function can first be used when a simulation has finished on each used cpu, so that the 

390 translation of the model is not considered in the time estimation. 

391 

392 :param float t1: 

393 Start time after n_cpu simulations. 

394 """ 

395 t_remaining = (time.time() - t1) / (self._n_sim_counter - self.n_cpu) * ( 

396 self._n_sim_total - self._n_sim_counter) 

397 p_finished = self._n_sim_counter / self._n_sim_total * 100 

398 sys.stderr.write(f"\rFinished {np.round(p_finished, 1)} %. " 

399 f"Approximately remaining time: {timedelta(seconds=int(t_remaining))} ") 

400 

401 def _check_disk_space(self, filepath): 

402 """ 

403 Checks how much disk space all simulations will need on a hard drive 

404 and throws a warning when less than 5 % would be free on the hard drive 

405 after all simulations. 

406 Works only for multiprocessing. 

407 """ 

408 

409 def convert_bytes(size): 

410 suffixes = ['B', 'KB', 'MB', 'GB', 'TB'] 

411 suffix_idx = 0 

412 while size >= 1024 and suffix_idx < len(suffixes): 

413 suffix_idx += 1 

414 size = size / 1024.0 

415 return f'{str(np.round(size, 2))} {suffixes[suffix_idx]}' 

416 

417 if not isinstance(filepath, (Path, str)) or not os.path.exists(filepath): 

418 self.logger.info( 

419 "Can't check disk usage as you probably used postprocessing on simulation " 

420 "results but did not return a file-path in the post-processing function" 

421 ) 

422 return 

423 

424 sim_file_size = os.stat(filepath).st_size 

425 sim_files_size = sim_file_size * self._n_sim_total 

426 self.logger.info(f"Simulations files need approximately {convert_bytes(sim_files_size)} of disk space") 

427 total, used, free = disk_usage(filepath) 

428 if sim_files_size > free - 0.05 * total: 

429 warnings.warn(f"{convert_bytes(free)} of free disk space on {filepath[:2]} " 

430 f"is not enough for all simulation files.") 

431 

432 @abstractmethod 

433 def _single_simulation(self, kwargs): 

434 """ 

435 Same arguments and function as simulate(). 

436 Used to differ between single- and multi-processing simulation""" 

437 raise NotImplementedError(f'{self.__class__.__name__}._single_simulation ' 

438 f'function is not defined') 

439 

440 @property 

441 def sim_setup(self) -> SimulationSetupClass: 

442 """Return current sim_setup""" 

443 return self._sim_setup 

444 

445 @sim_setup.deleter 

446 def sim_setup(self): 

447 """In case user deletes the object, reset it to the default one.""" 

448 self._sim_setup = self._sim_setup_class() 

449 

450 def set_sim_setup(self, sim_setup): 

451 """ 

452 Replaced in v0.1.7 by property function 

453 """ 

454 new_setup = self._sim_setup.model_dump() 

455 new_setup.update(sim_setup) 

456 self._sim_setup = self._sim_setup_class(**new_setup) 

457 

458 @property 

459 def model_name(self) -> str: 

460 """Name of the model being simulated""" 

461 return self._model_name 

462 

463 @model_name.setter 

464 def model_name(self, model_name: str): 

465 """ 

466 Set new model_name and trigger further functions 

467 to load parameters etc. 

468 """ 

469 # Only update if the model_name actually changes 

470 if self._model_name == model_name: 

471 return 

472 self._model_name = model_name 

473 # Only update model if it's the first setup. On multiprocessing, 

474 # all objects are duplicated and thus this setter is triggered again. 

475 # This if statement catches this case. 

476 if self.worker_idx and self.use_mp: 

477 return 

478 # Empty all variables again. 

479 self._update_model_variables() 

480 

481 def _update_model_variables(self): 

482 """ 

483 Function to empty all variables and update them again 

484 """ 

485 self.outputs = {} 

486 self.parameters = {} 

487 self.states = {} 

488 self.inputs = {} 

489 self._update_model() 

490 # Set all outputs to result_names: 

491 self.result_names = list(self.outputs.keys()) 

492 

493 @abstractmethod 

494 def _update_model(self): 

495 """ 

496 Reimplement this to change variables etc. 

497 based on the new model. 

498 """ 

499 raise NotImplementedError(f'{self.__class__.__name__}._update_model ' 

500 f'function is not defined') 

501 

502 def set_working_directory(self, working_directory: Union[Path, str]): 

503 """Base function for changing the current working directory.""" 

504 self.working_directory = working_directory 

505 

506 @property 

507 def working_directory(self) -> Path: 

508 """Get the current working directory""" 

509 return self._working_directory 

510 

511 @working_directory.setter 

512 def working_directory(self, working_directory: Union[Path, str]): 

513 """Set the current working directory""" 

514 if isinstance(working_directory, str): 

515 working_directory = Path(working_directory) 

516 os.makedirs(working_directory, exist_ok=True) 

517 self._working_directory = working_directory 

518 

519 def set_cd(self, cd: Union[Path, str]): 

520 warnings.warn("cd was renamed to working_directory in all classes. " 

521 "Use working_directory instead instead.", category=DeprecationWarning) 

522 self.working_directory = cd 

523 

524 @property 

525 def cd(self) -> Path: 

526 warnings.warn("cd was renamed to working_directory in all classes. " 

527 "Use working_directory instead instead.", category=DeprecationWarning) 

528 return self.working_directory 

529 

530 @cd.setter 

531 def cd(self, cd: Union[Path, str]): 

532 warnings.warn("cd was renamed to working_directory in all classes. " 

533 "Use working_directory instead instead.", category=DeprecationWarning) 

534 self.working_directory = cd 

535 

536 @property 

537 def result_names(self) -> List[str]: 

538 """ 

539 The variables names which to store in results. 

540 

541 Returns: 

542 list: List of string where the string is the 

543 name of the variable to store in the result. 

544 """ 

545 return self._result_names 

546 

547 @result_names.setter 

548 def result_names(self, result_names): 

549 """ 

550 Set the result names. If the name is not supported, 

551 an error is logged. 

552 """ 

553 self.check_unsupported_variables(variables=result_names, 

554 type_of_var="variables") 

555 self._result_names = result_names 

556 

557 @property 

558 def variables(self): 

559 """ 

560 All variables of the simulation model 

561 """ 

562 return list(itertools.chain(self.parameters.keys(), 

563 self.outputs.keys(), 

564 self.inputs.keys(), 

565 self.states.keys())) 

566 

567 def check_unsupported_variables(self, variables: List[str], type_of_var: str): 

568 """Log warnings if variables are not supported.""" 

569 if type_of_var == "parameters": 

570 ref = self.parameters.keys() 

571 elif type_of_var == "outputs": 

572 ref = self.outputs.keys() 

573 elif type_of_var == "inputs": 

574 ref = self.inputs.keys() 

575 elif type_of_var == "inputs": 

576 ref = self.states.keys() 

577 else: 

578 ref = self.variables 

579 

580 diff = set(variables).difference(ref) 

581 if diff: 

582 self.logger.warning( 

583 "Variables '%s' not found in model '%s'. " 

584 "Will most probably trigger an error when simulating.", 

585 ', '.join(diff), self.model_name 

586 ) 

587 return True 

588 return False 

589 

590 @classmethod 

591 def get_simulation_setup_fields(cls): 

592 """Return all fields in the chosen SimulationSetup class.""" 

593 return list(cls._sim_setup_class.__fields__.keys()) 

594 

595 def save_for_reproduction(self, 

596 title: str, 

597 path: pathlib.Path = None, 

598 files: list = None, 

599 **kwargs): 

600 """ 

601 Save the settings of the SimulationAPI in order to 

602 reproduce the settings of the used simulation. 

603 

604 Should be extended by child-classes to allow custom 

605 saving. 

606 

607 :param str title: 

608 Title of the study 

609 :param pathlib.Path path: 

610 Where to store the .zip file. If not given, self.cd is used. 

611 :param list files: 

612 List of files to save along the standard ones. 

613 Examples would be plots, tables etc. 

614 :param dict kwargs: 

615 All keyword arguments except title, files, and path of the function 

616 `save_reproduction_archive`. Most importantly, `log_message` may be 

617 specified to avoid input during execution. 

618 """ 

619 if path is None: 

620 path = self.cd 

621 return save_reproduction_archive( 

622 title=title, 

623 path=path, 

624 files=files, 

625 **kwargs 

626 )