Coverage for ebcpy/simulationapi/__init__.py: 91%

280 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-09-19 12:21 +0000

1""" 

2Simulation APIs help you to perform automated 

3simulations for energy and building climate related models. 

4Parameters can easily be updated, and the initialization-process is 

5much more user-friendly than the provided APIs by Dymola or fmpy. 

6""" 

7import pathlib 

8import warnings 

9import os 

10import sys 

11import itertools 

12import time 

13from pathlib import Path 

14from datetime import timedelta 

15from typing import Dict, Union, TypeVar, Any, List 

16from abc import abstractmethod 

17import multiprocessing as mp 

18 

19import pydantic 

20from pydantic import BaseModel, Field, field_validator 

21import numpy as np 

22from ebcpy.utils import setup_logger 

23from ebcpy.utils.reproduction import save_reproduction_archive 

24from shutil import disk_usage 

25 

26 

27class Variable(BaseModel): 

28 """ 

29 Data-Class to store relevant information for a 

30 simulation variable (input, parameter, output or local/state). 

31 """ 

32 type: Any = Field( 

33 default=None, 

34 title='type', 

35 description='Type of the variable' 

36 ) 

37 value: Any = Field( 

38 description="Default variable value" 

39 ) 

40 max: Any = Field( 

41 default=None, 

42 title='max', 

43 description='Maximal value (upper bound) of the variables value. ' 

44 'Only for ints and floats variables.' 

45 ) 

46 min: Any = Field( 

47 default=None, 

48 title='min', 

49 description='Minimal value (lower bound) of the variables value. ' 

50 'Only for ints and floats variables.' 

51 ) 

52 

53 @field_validator("value") 

54 @classmethod 

55 def check_value_type(cls, value, info: pydantic.FieldValidationInfo): 

56 """Check if the given value has correct type""" 

57 _type = info.data["type"] 

58 if _type is None: 

59 return value # No type -> no conversion 

60 if value is None: 

61 return value # Setting None is allowed. 

62 if not isinstance(value, _type): 

63 return _type(value) 

64 return value 

65 

66 @field_validator('max', 'min') 

67 @classmethod 

68 def check_value(cls, value, info: pydantic.FieldValidationInfo): 

69 """Check if the given bounds are correct.""" 

70 # Check if the variable type even allows for min/max bounds 

71 _type = info.data["type"] 

72 if _type is None: 

73 return value # No type -> no conversion 

74 if _type not in (float, int, bool): 

75 if value is not None: 

76 warnings.warn( 

77 "Setting a min/max for variables " 

78 f"of type {_type} is not supported." 

79 ) 

80 return None 

81 if value is not None: 

82 return _type(value) 

83 if info.field_name == "min": 

84 return -np.inf if _type != bool else False 

85 # else it is max 

86 return np.inf if _type != bool else True 

87 

88 

89class SimulationSetup(BaseModel): 

90 """ 

91 pydantic BaseModel child to define relevant 

92 parameters to setup the simulation. 

93 """ 

94 start_time: float = Field( 

95 default=0, 

96 description="The start time of the simulation", 

97 title="start_time" 

98 ) 

99 stop_time: float = Field( 

100 default=1, 

101 description="The stop / end time of the simulation", 

102 title="stop_time" 

103 ) 

104 output_interval: float = Field( 

105 default=1, 

106 description="The step size of the simulation and " 

107 "thus also output interval of results.", 

108 title="output_interval" 

109 ) 

110 fixedstepsize: float = Field( 

111 title="fixedstepsize", 

112 default=0.0, 

113 description="Fixed step size for Euler" 

114 ) 

115 solver: str = Field( 

116 title="solver", 

117 default="", # Is added in the field_validator 

118 description="The solver to be used for numerical integration." 

119 ) 

120 _default_solver: str = None 

121 _allowed_solvers: list = [] 

122 

123 @field_validator("solver") 

124 @classmethod 

125 def check_valid_solver(cls, solver): 

126 """ 

127 Check if the solver is in the list of valid solvers 

128 """ 

129 if not solver: 

130 return cls.__private_attributes__['_default_solver'].default 

131 allowed_solvers = cls.__private_attributes__['_allowed_solvers'].default 

132 if solver not in allowed_solvers: 

133 raise ValueError(f"Given solver '{solver}' is not supported! " 

134 f"Supported are '{allowed_solvers}'") 

135 return solver 

136 

137 class Config: 

138 """Overwrite default pydantic Config""" 

139 extra = 'forbid' 

140 

141 

142SimulationSetupClass = TypeVar("SimulationSetupClass", bound=SimulationSetup) 

143 

144 

145class SimulationAPI: 

146 """Base-class for simulation apis. Every simulation-api class 

147 must inherit from this class. It defines the structure of each class. 

148 

149 :param str,Path working_directory: 

150 Working directory path 

151 :param str model_name: 

152 Name of the model being simulated. 

153 :keyword int n_cpu: 

154 Number of cores to be used by simulation. 

155 If None is given, single core will be used. 

156 Maximum number equals the cpu count of the device. 

157 **Warning**: Logging is not yet fully working on multiple processes. 

158 Output will be written to the stream handler, but not to the created 

159 .log files. 

160 :keyword bool save_logs: If logs should be stored. 

161 

162 """ 

163 _sim_setup_class: SimulationSetupClass = SimulationSetup 

164 _items_to_drop = [ 

165 'pool', 

166 ] 

167 

168 def __init__(self, working_directory: Union[Path, str], model_name: str, 

169 **kwargs): 

170 # Private helper attrs for multiprocessing 

171 self._n_sim_counter = 0 

172 self._n_sim_total = 0 

173 self._progress_int = 0 

174 # Handle deprecation warning 

175 self.working_directory = working_directory 

176 save_logs = kwargs.get("save_logs", True) 

177 self.logger = setup_logger( 

178 working_directory=self.working_directory if save_logs else None, 

179 name=self.__class__.__name__ 

180 ) 

181 # Setup the logger 

182 self.logger.info(f'{"-" * 25}Initializing class {self.__class__.__name__}{"-" * 25}') 

183 # Check multiprocessing 

184 self.n_cpu = kwargs.get("n_cpu", 1) 

185 if self.n_cpu > mp.cpu_count(): 

186 raise ValueError(f"Given n_cpu '{self.n_cpu}' is greater " 

187 "than the available number of " 

188 f"cpus on your machine '{mp.cpu_count()}'") 

189 if self.n_cpu > 1: 

190 # pylint: disable=consider-using-with 

191 self.pool = mp.Pool(processes=self.n_cpu) 

192 self.use_mp = True 

193 else: 

194 self.pool = None 

195 self.use_mp = False 

196 # Setup the model 

197 self._sim_setup = self._sim_setup_class() 

198 self.inputs: Dict[str, Variable] = {} # Inputs of model 

199 self.outputs: Dict[str, Variable] = {} # Outputs of model 

200 self.parameters: Dict[str, Variable] = {} # Parameter of model 

201 self.states: Dict[str, Variable] = {} # States of model 

202 self.result_names = [] 

203 self._model_name = None 

204 self.model_name = model_name 

205 

206 # MP-Functions 

207 @property 

208 def worker_idx(self): 

209 """Index of the current worker""" 

210 _id = mp.current_process()._identity 

211 if _id: 

212 return _id[0] 

213 return None 

214 

215 def __getstate__(self): 

216 """Overwrite magic method to allow pickling the api object""" 

217 self_dict = self.__dict__.copy() 

218 for item in self._items_to_drop: 

219 del self_dict[item] 

220 return self_dict 

221 

222 def __setstate__(self, state): 

223 """Overwrite magic method to allow pickling the api object""" 

224 self.__dict__.update(state) 

225 

226 def close(self): 

227 """Base function for closing the simulation-program.""" 

228 if self.use_mp: 

229 try: 

230 self.pool.map(self._close_multiprocessing, 

231 list(range(self.n_cpu))) 

232 self.pool.close() 

233 self.pool.join() 

234 except ValueError: 

235 pass # Already closed prior to atexit 

236 

237 @abstractmethod 

238 def _close_multiprocessing(self, _): 

239 raise NotImplementedError(f'{self.__class__.__name__}.close ' 

240 f'function is not defined') 

241 

242 @abstractmethod 

243 def _single_close(self, **kwargs): 

244 """Base function for closing the simulation-program of a single core""" 

245 raise NotImplementedError(f'{self.__class__.__name__}._single_close ' 

246 f'function is not defined') 

247 

248 @abstractmethod 

249 def simulate(self, 

250 parameters: Union[dict, List[dict]] = None, 

251 return_option: str = "time_series", 

252 **kwargs): 

253 """ 

254 Base function for simulating the simulation-model. 

255 

256 :param dict parameters: 

257 Parameters to simulate. 

258 Names of parameters are key, values are value of the dict. 

259 It is also possible to specify a list of multiple parameter 

260 dicts for different parameter variations to be simulated. 

261 Default is an empty dict. 

262 :param str return_option: 

263 How to handle the simulation results. Options are: 

264 - 'time_series': Returns a DataFrame with the results and does not store anything. 

265 Only variables specified in result_names will be returned. 

266 - 'last_point': Returns only the last point of the simulation. 

267 Relevant for integral metrics like energy consumption. 

268 Only variables specified in result_names will be returned. 

269 - 'savepath': Returns the savepath where the results are stored. 

270 Depending on the API, different kwargs may be used to specify file type etc. 

271 :keyword str,Path savepath: 

272 If path is provided, the relevant simulation results will be saved 

273 in the given directory. For multiple parameter variations also a list 

274 of savepaths for each parameterset can be specified. 

275 The savepaths for each parameter set must be unique. 

276 Only relevant if return_option equals 'savepath'. 

277 Default is the current working directory. 

278 :keyword str result_file_name: 

279 Name of the result file. Default is 'resultFile'. 

280 For multiple parameter variations a list of names 

281 for each result must be specified.  

282 Only relevant if return_option equals 'savepath'. 

283 :keyword (TimeSeriesData, pd.DataFrame) inputs: 

284 Pandas.Dataframe of the input data for simulating the FMU with fmpy 

285 :keyword Boolean fail_on_error: 

286 If True, an error in fmpy will trigger an error in this script. 

287 Default is True 

288 

289 :return: str,os.path.normpath filepath: 

290 Only if return_option equals 'savepath'. 

291 Filepath of the result file. 

292 :return: dict: 

293 Only if return_option equals 'last_point'. 

294 :return: Union[List[pd.DataFrame],pd.DataFrame]: 

295 If parameters are scalar and squeeze=True, 

296 a DataFrame with the columns being equal to 

297 self.result_names. 

298 If multiple set's of initial values are given, one 

299 dataframe for each set is returned in a list 

300 """ 

301 # Convert inputs to equally sized objects of lists: 

302 if parameters is None: 

303 parameters = [{}] 

304 if isinstance(parameters, dict): 

305 parameters = [parameters] 

306 

307 if return_option not in ["time_series", "savepath", "last_point"]: 

308 raise ValueError(f"Given return option '{return_option}' is not supported.") 

309 

310 new_kwargs = {} 

311 kwargs["return_option"] = return_option # Update with arg 

312 n_simulations = len(parameters) 

313 # Handle special case for saving files: 

314 if return_option == "savepath" and n_simulations > 1: 

315 savepath = kwargs.get("savepath", self.working_directory) 

316 if isinstance(savepath, (str, os.PathLike, Path)): 

317 savepath = [savepath] * n_simulations 

318 result_file_name = kwargs.get("result_file_name", []) 

319 if isinstance(result_file_name, str): 

320 result_file_name = [result_file_name] * n_simulations 

321 if len(savepath) != len(result_file_name): 

322 raise ValueError("Given savepath and result_file_name " 

323 "have not the same length.") 

324 joined_save_paths = [] 

325 for _single_save_path, _single_result_name in zip(savepath, result_file_name): 

326 joined_save_paths.append(os.path.join(_single_save_path, _single_result_name)) 

327 if len(set(joined_save_paths)) != n_simulations: 

328 raise ValueError( 

329 "Simulating multiple parameter set's on " 

330 "the same combination of savepath and result_file_name " 

331 "will override results or even cause errors. " 

332 "Specify a unique result_file_name-savepath combination " 

333 "for each parameter combination" 

334 ) 

335 for key, value in kwargs.items(): 

336 if isinstance(value, list): 

337 if len(value) != n_simulations: 

338 raise ValueError(f"Mismatch in multiprocessing of " 

339 f"given parameters ({n_simulations}) " 

340 f"and given {key} ({len(value)})") 

341 new_kwargs[key] = value 

342 else: 

343 new_kwargs[key] = [value] * n_simulations 

344 kwargs = [] 

345 for _idx, _parameters in enumerate(parameters): 

346 kwargs.append( 

347 {"parameters": _parameters, 

348 **{key: value[_idx] for key, value in new_kwargs.items()} 

349 } 

350 ) 

351 # Decide between mp and single core 

352 t_sim_start = time.time() 

353 if self.use_mp: 

354 self._n_sim_counter = 0 

355 self._n_sim_total = len(kwargs) 

356 self._progress_int = 0 

357 self.logger.info("Starting %s simulations on %s cores", 

358 self._n_sim_total, self.n_cpu) 

359 results = [] 

360 for result in self.pool.imap(self._single_simulation, kwargs): 

361 results.append(result) 

362 self._n_sim_counter += 1 

363 # Assuming that all worker start and finish their first simulation 

364 # at the same time, so that the time estimation begins after 

365 # n_cpu simulations. Otherwise, the translation and start process 

366 # could falsify the time estimation. 

367 if self._n_sim_counter == self.n_cpu: 

368 t1 = time.time() 

369 if self._n_sim_counter > self.n_cpu: 

370 self._remaining_time(t1) 

371 if self._n_sim_counter == 1 and return_option == 'savepath': 

372 self._check_disk_space(result) 

373 sys.stderr.write("\r") 

374 else: 

375 results = [self._single_simulation(kwargs={ 

376 "parameters": _single_kwargs["parameters"], 

377 "return_option": _single_kwargs["return_option"], 

378 **_single_kwargs 

379 }) for _single_kwargs in kwargs] 

380 self.logger.info(f"Finished {n_simulations} simulations on {self.n_cpu} processes in " 

381 f"{timedelta(seconds=int(time.time() - t_sim_start))}") 

382 if len(results) == 1: 

383 return results[0] 

384 return results 

385 

386 def _remaining_time(self, t1): 

387 """ 

388 Helper function to calculate the remaining simulation time and log the finished simulations. 

389 The function can first be used when a simulation has finished on each used cpu, so that the 

390 translation of the model is not considered in the time estimation. 

391 

392 :param float t1: 

393 Start time after n_cpu simulations. 

394 """ 

395 t_remaining = (time.time() - t1) / (self._n_sim_counter - self.n_cpu) * ( 

396 self._n_sim_total - self._n_sim_counter) 

397 p_finished = self._n_sim_counter / self._n_sim_total * 100 

398 sys.stderr.write(f"\rFinished {np.round(p_finished, 1)} %. " 

399 f"Approximately remaining time: {timedelta(seconds=int(t_remaining))} ") 

400 

401 def _check_disk_space(self, filepath): 

402 """ 

403 Checks how much disk space all simulations will need on a hard drive 

404 and throws a warning when less than 5 % would be free on the hard drive 

405 after all simulations. 

406 Works only for multiprocessing. 

407 """ 

408 

409 def convert_bytes(size): 

410 suffixes = ['B', 'KB', 'MB', 'GB', 'TB'] 

411 suffix_idx = 0 

412 while size >= 1024 and suffix_idx < len(suffixes): 

413 suffix_idx += 1 

414 size = size / 1024.0 

415 return f'{str(np.round(size, 2))} {suffixes[suffix_idx]}' 

416 

417 sim_file_size = os.stat(filepath).st_size 

418 sim_files_size = sim_file_size * self._n_sim_total 

419 self.logger.info(f"Simulations files need approximately {convert_bytes(sim_files_size)} of disk space") 

420 total, used, free = disk_usage(filepath) 

421 if sim_files_size > free - 0.05 * total: 

422 warnings.warn(f"{convert_bytes(free)} of free disk space on {filepath[:2]} " 

423 f"is not enough for all simulation files.") 

424 

425 @abstractmethod 

426 def _single_simulation(self, kwargs): 

427 """ 

428 Same arguments and function as simulate(). 

429 Used to differ between single- and multi-processing simulation""" 

430 raise NotImplementedError(f'{self.__class__.__name__}._single_simulation ' 

431 f'function is not defined') 

432 

433 @property 

434 def sim_setup(self) -> SimulationSetupClass: 

435 """Return current sim_setup""" 

436 return self._sim_setup 

437 

438 @sim_setup.deleter 

439 def sim_setup(self): 

440 """In case user deletes the object, reset it to the default one.""" 

441 self._sim_setup = self._sim_setup_class() 

442 

443 def set_sim_setup(self, sim_setup): 

444 """ 

445 Replaced in v0.1.7 by property function 

446 """ 

447 new_setup = self._sim_setup.dict() 

448 new_setup.update(sim_setup) 

449 self._sim_setup = self._sim_setup_class(**new_setup) 

450 

451 @property 

452 def model_name(self) -> str: 

453 """Name of the model being simulated""" 

454 return self._model_name 

455 

456 @model_name.setter 

457 def model_name(self, model_name: str): 

458 """ 

459 Set new model_name and trigger further functions 

460 to load parameters etc. 

461 """ 

462 # Only update if the model_name actually changes 

463 if self._model_name == model_name: 

464 return 

465 self._model_name = model_name 

466 # Only update model if it's the first setup. On multiprocessing, 

467 # all objects are duplicated and thus this setter is triggered again. 

468 # This if statement catches this case. 

469 if self.worker_idx and self.use_mp: 

470 return 

471 # Empty all variables again. 

472 self._update_model_variables() 

473 

474 def _update_model_variables(self): 

475 """ 

476 Function to empty all variables and update them again 

477 """ 

478 self.outputs = {} 

479 self.parameters = {} 

480 self.states = {} 

481 self.inputs = {} 

482 self._update_model() 

483 # Set all outputs to result_names: 

484 self.result_names = list(self.outputs.keys()) 

485 

486 @abstractmethod 

487 def _update_model(self): 

488 """ 

489 Reimplement this to change variables etc. 

490 based on the new model. 

491 """ 

492 raise NotImplementedError(f'{self.__class__.__name__}._update_model ' 

493 f'function is not defined') 

494 

495 def set_working_directory(self, working_directory: Union[Path, str]): 

496 """Base function for changing the current working directory.""" 

497 self.working_directory = working_directory 

498 

499 @property 

500 def working_directory(self) -> Path: 

501 """Get the current working directory""" 

502 return self._working_directory 

503 

504 @working_directory.setter 

505 def working_directory(self, working_directory: Union[Path, str]): 

506 """Set the current working directory""" 

507 if isinstance(working_directory, str): 

508 working_directory = Path(working_directory) 

509 os.makedirs(working_directory, exist_ok=True) 

510 self._working_directory = working_directory 

511 

512 def set_cd(self, cd: Union[Path, str]): 

513 warnings.warn("cd was renamed to working_directory in all classes. " 

514 "Use working_directory instead instead.", category=DeprecationWarning) 

515 self.working_directory = cd 

516 

517 @property 

518 def cd(self) -> Path: 

519 warnings.warn("cd was renamed to working_directory in all classes. " 

520 "Use working_directory instead instead.", category=DeprecationWarning) 

521 return self.working_directory 

522 

523 @cd.setter 

524 def cd(self, cd: Union[Path, str]): 

525 warnings.warn("cd was renamed to working_directory in all classes. " 

526 "Use working_directory instead instead.", category=DeprecationWarning) 

527 self.working_directory = cd 

528 

529 @property 

530 def result_names(self) -> List[str]: 

531 """ 

532 The variables names which to store in results. 

533 

534 Returns: 

535 list: List of string where the string is the 

536 name of the variable to store in the result. 

537 """ 

538 return self._result_names 

539 

540 @result_names.setter 

541 def result_names(self, result_names): 

542 """ 

543 Set the result names. If the name is not supported, 

544 an error is logged. 

545 """ 

546 self.check_unsupported_variables(variables=result_names, 

547 type_of_var="variables") 

548 self._result_names = result_names 

549 

550 @property 

551 def variables(self): 

552 """ 

553 All variables of the simulation model 

554 """ 

555 return list(itertools.chain(self.parameters.keys(), 

556 self.outputs.keys(), 

557 self.inputs.keys(), 

558 self.states.keys())) 

559 

560 def check_unsupported_variables(self, variables: List[str], type_of_var: str): 

561 """Log warnings if variables are not supported.""" 

562 if type_of_var == "parameters": 

563 ref = self.parameters.keys() 

564 elif type_of_var == "outputs": 

565 ref = self.outputs.keys() 

566 elif type_of_var == "inputs": 

567 ref = self.inputs.keys() 

568 elif type_of_var == "inputs": 

569 ref = self.states.keys() 

570 else: 

571 ref = self.variables 

572 

573 diff = set(variables).difference(ref) 

574 if diff: 

575 self.logger.warning( 

576 "Variables '%s' not found in model '%s'. " 

577 "Will most probably trigger an error when simulating.", 

578 ', '.join(diff), self.model_name 

579 ) 

580 return True 

581 return False 

582 

583 @classmethod 

584 def get_simulation_setup_fields(cls): 

585 """Return all fields in the chosen SimulationSetup class.""" 

586 return list(cls._sim_setup_class.__fields__.keys()) 

587 

588 def save_for_reproduction(self, 

589 title: str, 

590 path: pathlib.Path = None, 

591 files: list = None, 

592 **kwargs): 

593 """ 

594 Save the settings of the SimulationAPI in order to 

595 reproduce the settings of the used simulation. 

596 

597 Should be extended by child-classes to allow custom 

598 saving. 

599 

600 :param str title: 

601 Title of the study 

602 :param pathlib.Path path: 

603 Where to store the .zip file. If not given, self.cd is used. 

604 :param list files: 

605 List of files to save along the standard ones. 

606 Examples would be plots, tables etc. 

607 :param dict kwargs: 

608 All keyword arguments except title, files, and path of the function 

609 `save_reproduction_archive`. Most importantly, `log_message` may be 

610 specified to avoid input during execution. 

611 """ 

612 if path is None: 

613 path = self.cd 

614 return save_reproduction_archive( 

615 title=title, 

616 path=path, 

617 files=files, 

618 **kwargs 

619 )