Coverage for ebcpy/simulationapi/__init__.py: 91%
280 statements
« prev ^ index » next coverage.py v7.4.4, created at 2024-09-19 12:21 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2024-09-19 12:21 +0000
1"""
2Simulation APIs help you to perform automated
3simulations for energy and building climate related models.
4Parameters can easily be updated, and the initialization-process is
5much more user-friendly than the provided APIs by Dymola or fmpy.
6"""
7import pathlib
8import warnings
9import os
10import sys
11import itertools
12import time
13from pathlib import Path
14from datetime import timedelta
15from typing import Dict, Union, TypeVar, Any, List
16from abc import abstractmethod
17import multiprocessing as mp
19import pydantic
20from pydantic import BaseModel, Field, field_validator
21import numpy as np
22from ebcpy.utils import setup_logger
23from ebcpy.utils.reproduction import save_reproduction_archive
24from shutil import disk_usage
27class Variable(BaseModel):
28 """
29 Data-Class to store relevant information for a
30 simulation variable (input, parameter, output or local/state).
31 """
32 type: Any = Field(
33 default=None,
34 title='type',
35 description='Type of the variable'
36 )
37 value: Any = Field(
38 description="Default variable value"
39 )
40 max: Any = Field(
41 default=None,
42 title='max',
43 description='Maximal value (upper bound) of the variables value. '
44 'Only for ints and floats variables.'
45 )
46 min: Any = Field(
47 default=None,
48 title='min',
49 description='Minimal value (lower bound) of the variables value. '
50 'Only for ints and floats variables.'
51 )
53 @field_validator("value")
54 @classmethod
55 def check_value_type(cls, value, info: pydantic.FieldValidationInfo):
56 """Check if the given value has correct type"""
57 _type = info.data["type"]
58 if _type is None:
59 return value # No type -> no conversion
60 if value is None:
61 return value # Setting None is allowed.
62 if not isinstance(value, _type):
63 return _type(value)
64 return value
66 @field_validator('max', 'min')
67 @classmethod
68 def check_value(cls, value, info: pydantic.FieldValidationInfo):
69 """Check if the given bounds are correct."""
70 # Check if the variable type even allows for min/max bounds
71 _type = info.data["type"]
72 if _type is None:
73 return value # No type -> no conversion
74 if _type not in (float, int, bool):
75 if value is not None:
76 warnings.warn(
77 "Setting a min/max for variables "
78 f"of type {_type} is not supported."
79 )
80 return None
81 if value is not None:
82 return _type(value)
83 if info.field_name == "min":
84 return -np.inf if _type != bool else False
85 # else it is max
86 return np.inf if _type != bool else True
89class SimulationSetup(BaseModel):
90 """
91 pydantic BaseModel child to define relevant
92 parameters to setup the simulation.
93 """
94 start_time: float = Field(
95 default=0,
96 description="The start time of the simulation",
97 title="start_time"
98 )
99 stop_time: float = Field(
100 default=1,
101 description="The stop / end time of the simulation",
102 title="stop_time"
103 )
104 output_interval: float = Field(
105 default=1,
106 description="The step size of the simulation and "
107 "thus also output interval of results.",
108 title="output_interval"
109 )
110 fixedstepsize: float = Field(
111 title="fixedstepsize",
112 default=0.0,
113 description="Fixed step size for Euler"
114 )
115 solver: str = Field(
116 title="solver",
117 default="", # Is added in the field_validator
118 description="The solver to be used for numerical integration."
119 )
120 _default_solver: str = None
121 _allowed_solvers: list = []
123 @field_validator("solver")
124 @classmethod
125 def check_valid_solver(cls, solver):
126 """
127 Check if the solver is in the list of valid solvers
128 """
129 if not solver:
130 return cls.__private_attributes__['_default_solver'].default
131 allowed_solvers = cls.__private_attributes__['_allowed_solvers'].default
132 if solver not in allowed_solvers:
133 raise ValueError(f"Given solver '{solver}' is not supported! "
134 f"Supported are '{allowed_solvers}'")
135 return solver
137 class Config:
138 """Overwrite default pydantic Config"""
139 extra = 'forbid'
142SimulationSetupClass = TypeVar("SimulationSetupClass", bound=SimulationSetup)
145class SimulationAPI:
146 """Base-class for simulation apis. Every simulation-api class
147 must inherit from this class. It defines the structure of each class.
149 :param str,Path working_directory:
150 Working directory path
151 :param str model_name:
152 Name of the model being simulated.
153 :keyword int n_cpu:
154 Number of cores to be used by simulation.
155 If None is given, single core will be used.
156 Maximum number equals the cpu count of the device.
157 **Warning**: Logging is not yet fully working on multiple processes.
158 Output will be written to the stream handler, but not to the created
159 .log files.
160 :keyword bool save_logs: If logs should be stored.
162 """
163 _sim_setup_class: SimulationSetupClass = SimulationSetup
164 _items_to_drop = [
165 'pool',
166 ]
168 def __init__(self, working_directory: Union[Path, str], model_name: str,
169 **kwargs):
170 # Private helper attrs for multiprocessing
171 self._n_sim_counter = 0
172 self._n_sim_total = 0
173 self._progress_int = 0
174 # Handle deprecation warning
175 self.working_directory = working_directory
176 save_logs = kwargs.get("save_logs", True)
177 self.logger = setup_logger(
178 working_directory=self.working_directory if save_logs else None,
179 name=self.__class__.__name__
180 )
181 # Setup the logger
182 self.logger.info(f'{"-" * 25}Initializing class {self.__class__.__name__}{"-" * 25}')
183 # Check multiprocessing
184 self.n_cpu = kwargs.get("n_cpu", 1)
185 if self.n_cpu > mp.cpu_count():
186 raise ValueError(f"Given n_cpu '{self.n_cpu}' is greater "
187 "than the available number of "
188 f"cpus on your machine '{mp.cpu_count()}'")
189 if self.n_cpu > 1:
190 # pylint: disable=consider-using-with
191 self.pool = mp.Pool(processes=self.n_cpu)
192 self.use_mp = True
193 else:
194 self.pool = None
195 self.use_mp = False
196 # Setup the model
197 self._sim_setup = self._sim_setup_class()
198 self.inputs: Dict[str, Variable] = {} # Inputs of model
199 self.outputs: Dict[str, Variable] = {} # Outputs of model
200 self.parameters: Dict[str, Variable] = {} # Parameter of model
201 self.states: Dict[str, Variable] = {} # States of model
202 self.result_names = []
203 self._model_name = None
204 self.model_name = model_name
206 # MP-Functions
207 @property
208 def worker_idx(self):
209 """Index of the current worker"""
210 _id = mp.current_process()._identity
211 if _id:
212 return _id[0]
213 return None
215 def __getstate__(self):
216 """Overwrite magic method to allow pickling the api object"""
217 self_dict = self.__dict__.copy()
218 for item in self._items_to_drop:
219 del self_dict[item]
220 return self_dict
222 def __setstate__(self, state):
223 """Overwrite magic method to allow pickling the api object"""
224 self.__dict__.update(state)
226 def close(self):
227 """Base function for closing the simulation-program."""
228 if self.use_mp:
229 try:
230 self.pool.map(self._close_multiprocessing,
231 list(range(self.n_cpu)))
232 self.pool.close()
233 self.pool.join()
234 except ValueError:
235 pass # Already closed prior to atexit
237 @abstractmethod
238 def _close_multiprocessing(self, _):
239 raise NotImplementedError(f'{self.__class__.__name__}.close '
240 f'function is not defined')
242 @abstractmethod
243 def _single_close(self, **kwargs):
244 """Base function for closing the simulation-program of a single core"""
245 raise NotImplementedError(f'{self.__class__.__name__}._single_close '
246 f'function is not defined')
248 @abstractmethod
249 def simulate(self,
250 parameters: Union[dict, List[dict]] = None,
251 return_option: str = "time_series",
252 **kwargs):
253 """
254 Base function for simulating the simulation-model.
256 :param dict parameters:
257 Parameters to simulate.
258 Names of parameters are key, values are value of the dict.
259 It is also possible to specify a list of multiple parameter
260 dicts for different parameter variations to be simulated.
261 Default is an empty dict.
262 :param str return_option:
263 How to handle the simulation results. Options are:
264 - 'time_series': Returns a DataFrame with the results and does not store anything.
265 Only variables specified in result_names will be returned.
266 - 'last_point': Returns only the last point of the simulation.
267 Relevant for integral metrics like energy consumption.
268 Only variables specified in result_names will be returned.
269 - 'savepath': Returns the savepath where the results are stored.
270 Depending on the API, different kwargs may be used to specify file type etc.
271 :keyword str,Path savepath:
272 If path is provided, the relevant simulation results will be saved
273 in the given directory. For multiple parameter variations also a list
274 of savepaths for each parameterset can be specified.
275 The savepaths for each parameter set must be unique.
276 Only relevant if return_option equals 'savepath'.
277 Default is the current working directory.
278 :keyword str result_file_name:
279 Name of the result file. Default is 'resultFile'.
280 For multiple parameter variations a list of names
281 for each result must be specified.
282 Only relevant if return_option equals 'savepath'.
283 :keyword (TimeSeriesData, pd.DataFrame) inputs:
284 Pandas.Dataframe of the input data for simulating the FMU with fmpy
285 :keyword Boolean fail_on_error:
286 If True, an error in fmpy will trigger an error in this script.
287 Default is True
289 :return: str,os.path.normpath filepath:
290 Only if return_option equals 'savepath'.
291 Filepath of the result file.
292 :return: dict:
293 Only if return_option equals 'last_point'.
294 :return: Union[List[pd.DataFrame],pd.DataFrame]:
295 If parameters are scalar and squeeze=True,
296 a DataFrame with the columns being equal to
297 self.result_names.
298 If multiple set's of initial values are given, one
299 dataframe for each set is returned in a list
300 """
301 # Convert inputs to equally sized objects of lists:
302 if parameters is None:
303 parameters = [{}]
304 if isinstance(parameters, dict):
305 parameters = [parameters]
307 if return_option not in ["time_series", "savepath", "last_point"]:
308 raise ValueError(f"Given return option '{return_option}' is not supported.")
310 new_kwargs = {}
311 kwargs["return_option"] = return_option # Update with arg
312 n_simulations = len(parameters)
313 # Handle special case for saving files:
314 if return_option == "savepath" and n_simulations > 1:
315 savepath = kwargs.get("savepath", self.working_directory)
316 if isinstance(savepath, (str, os.PathLike, Path)):
317 savepath = [savepath] * n_simulations
318 result_file_name = kwargs.get("result_file_name", [])
319 if isinstance(result_file_name, str):
320 result_file_name = [result_file_name] * n_simulations
321 if len(savepath) != len(result_file_name):
322 raise ValueError("Given savepath and result_file_name "
323 "have not the same length.")
324 joined_save_paths = []
325 for _single_save_path, _single_result_name in zip(savepath, result_file_name):
326 joined_save_paths.append(os.path.join(_single_save_path, _single_result_name))
327 if len(set(joined_save_paths)) != n_simulations:
328 raise ValueError(
329 "Simulating multiple parameter set's on "
330 "the same combination of savepath and result_file_name "
331 "will override results or even cause errors. "
332 "Specify a unique result_file_name-savepath combination "
333 "for each parameter combination"
334 )
335 for key, value in kwargs.items():
336 if isinstance(value, list):
337 if len(value) != n_simulations:
338 raise ValueError(f"Mismatch in multiprocessing of "
339 f"given parameters ({n_simulations}) "
340 f"and given {key} ({len(value)})")
341 new_kwargs[key] = value
342 else:
343 new_kwargs[key] = [value] * n_simulations
344 kwargs = []
345 for _idx, _parameters in enumerate(parameters):
346 kwargs.append(
347 {"parameters": _parameters,
348 **{key: value[_idx] for key, value in new_kwargs.items()}
349 }
350 )
351 # Decide between mp and single core
352 t_sim_start = time.time()
353 if self.use_mp:
354 self._n_sim_counter = 0
355 self._n_sim_total = len(kwargs)
356 self._progress_int = 0
357 self.logger.info("Starting %s simulations on %s cores",
358 self._n_sim_total, self.n_cpu)
359 results = []
360 for result in self.pool.imap(self._single_simulation, kwargs):
361 results.append(result)
362 self._n_sim_counter += 1
363 # Assuming that all worker start and finish their first simulation
364 # at the same time, so that the time estimation begins after
365 # n_cpu simulations. Otherwise, the translation and start process
366 # could falsify the time estimation.
367 if self._n_sim_counter == self.n_cpu:
368 t1 = time.time()
369 if self._n_sim_counter > self.n_cpu:
370 self._remaining_time(t1)
371 if self._n_sim_counter == 1 and return_option == 'savepath':
372 self._check_disk_space(result)
373 sys.stderr.write("\r")
374 else:
375 results = [self._single_simulation(kwargs={
376 "parameters": _single_kwargs["parameters"],
377 "return_option": _single_kwargs["return_option"],
378 **_single_kwargs
379 }) for _single_kwargs in kwargs]
380 self.logger.info(f"Finished {n_simulations} simulations on {self.n_cpu} processes in "
381 f"{timedelta(seconds=int(time.time() - t_sim_start))}")
382 if len(results) == 1:
383 return results[0]
384 return results
386 def _remaining_time(self, t1):
387 """
388 Helper function to calculate the remaining simulation time and log the finished simulations.
389 The function can first be used when a simulation has finished on each used cpu, so that the
390 translation of the model is not considered in the time estimation.
392 :param float t1:
393 Start time after n_cpu simulations.
394 """
395 t_remaining = (time.time() - t1) / (self._n_sim_counter - self.n_cpu) * (
396 self._n_sim_total - self._n_sim_counter)
397 p_finished = self._n_sim_counter / self._n_sim_total * 100
398 sys.stderr.write(f"\rFinished {np.round(p_finished, 1)} %. "
399 f"Approximately remaining time: {timedelta(seconds=int(t_remaining))} ")
401 def _check_disk_space(self, filepath):
402 """
403 Checks how much disk space all simulations will need on a hard drive
404 and throws a warning when less than 5 % would be free on the hard drive
405 after all simulations.
406 Works only for multiprocessing.
407 """
409 def convert_bytes(size):
410 suffixes = ['B', 'KB', 'MB', 'GB', 'TB']
411 suffix_idx = 0
412 while size >= 1024 and suffix_idx < len(suffixes):
413 suffix_idx += 1
414 size = size / 1024.0
415 return f'{str(np.round(size, 2))} {suffixes[suffix_idx]}'
417 sim_file_size = os.stat(filepath).st_size
418 sim_files_size = sim_file_size * self._n_sim_total
419 self.logger.info(f"Simulations files need approximately {convert_bytes(sim_files_size)} of disk space")
420 total, used, free = disk_usage(filepath)
421 if sim_files_size > free - 0.05 * total:
422 warnings.warn(f"{convert_bytes(free)} of free disk space on {filepath[:2]} "
423 f"is not enough for all simulation files.")
425 @abstractmethod
426 def _single_simulation(self, kwargs):
427 """
428 Same arguments and function as simulate().
429 Used to differ between single- and multi-processing simulation"""
430 raise NotImplementedError(f'{self.__class__.__name__}._single_simulation '
431 f'function is not defined')
433 @property
434 def sim_setup(self) -> SimulationSetupClass:
435 """Return current sim_setup"""
436 return self._sim_setup
438 @sim_setup.deleter
439 def sim_setup(self):
440 """In case user deletes the object, reset it to the default one."""
441 self._sim_setup = self._sim_setup_class()
443 def set_sim_setup(self, sim_setup):
444 """
445 Replaced in v0.1.7 by property function
446 """
447 new_setup = self._sim_setup.dict()
448 new_setup.update(sim_setup)
449 self._sim_setup = self._sim_setup_class(**new_setup)
451 @property
452 def model_name(self) -> str:
453 """Name of the model being simulated"""
454 return self._model_name
456 @model_name.setter
457 def model_name(self, model_name: str):
458 """
459 Set new model_name and trigger further functions
460 to load parameters etc.
461 """
462 # Only update if the model_name actually changes
463 if self._model_name == model_name:
464 return
465 self._model_name = model_name
466 # Only update model if it's the first setup. On multiprocessing,
467 # all objects are duplicated and thus this setter is triggered again.
468 # This if statement catches this case.
469 if self.worker_idx and self.use_mp:
470 return
471 # Empty all variables again.
472 self._update_model_variables()
474 def _update_model_variables(self):
475 """
476 Function to empty all variables and update them again
477 """
478 self.outputs = {}
479 self.parameters = {}
480 self.states = {}
481 self.inputs = {}
482 self._update_model()
483 # Set all outputs to result_names:
484 self.result_names = list(self.outputs.keys())
486 @abstractmethod
487 def _update_model(self):
488 """
489 Reimplement this to change variables etc.
490 based on the new model.
491 """
492 raise NotImplementedError(f'{self.__class__.__name__}._update_model '
493 f'function is not defined')
495 def set_working_directory(self, working_directory: Union[Path, str]):
496 """Base function for changing the current working directory."""
497 self.working_directory = working_directory
499 @property
500 def working_directory(self) -> Path:
501 """Get the current working directory"""
502 return self._working_directory
504 @working_directory.setter
505 def working_directory(self, working_directory: Union[Path, str]):
506 """Set the current working directory"""
507 if isinstance(working_directory, str):
508 working_directory = Path(working_directory)
509 os.makedirs(working_directory, exist_ok=True)
510 self._working_directory = working_directory
512 def set_cd(self, cd: Union[Path, str]):
513 warnings.warn("cd was renamed to working_directory in all classes. "
514 "Use working_directory instead instead.", category=DeprecationWarning)
515 self.working_directory = cd
517 @property
518 def cd(self) -> Path:
519 warnings.warn("cd was renamed to working_directory in all classes. "
520 "Use working_directory instead instead.", category=DeprecationWarning)
521 return self.working_directory
523 @cd.setter
524 def cd(self, cd: Union[Path, str]):
525 warnings.warn("cd was renamed to working_directory in all classes. "
526 "Use working_directory instead instead.", category=DeprecationWarning)
527 self.working_directory = cd
529 @property
530 def result_names(self) -> List[str]:
531 """
532 The variables names which to store in results.
534 Returns:
535 list: List of string where the string is the
536 name of the variable to store in the result.
537 """
538 return self._result_names
540 @result_names.setter
541 def result_names(self, result_names):
542 """
543 Set the result names. If the name is not supported,
544 an error is logged.
545 """
546 self.check_unsupported_variables(variables=result_names,
547 type_of_var="variables")
548 self._result_names = result_names
550 @property
551 def variables(self):
552 """
553 All variables of the simulation model
554 """
555 return list(itertools.chain(self.parameters.keys(),
556 self.outputs.keys(),
557 self.inputs.keys(),
558 self.states.keys()))
560 def check_unsupported_variables(self, variables: List[str], type_of_var: str):
561 """Log warnings if variables are not supported."""
562 if type_of_var == "parameters":
563 ref = self.parameters.keys()
564 elif type_of_var == "outputs":
565 ref = self.outputs.keys()
566 elif type_of_var == "inputs":
567 ref = self.inputs.keys()
568 elif type_of_var == "inputs":
569 ref = self.states.keys()
570 else:
571 ref = self.variables
573 diff = set(variables).difference(ref)
574 if diff:
575 self.logger.warning(
576 "Variables '%s' not found in model '%s'. "
577 "Will most probably trigger an error when simulating.",
578 ', '.join(diff), self.model_name
579 )
580 return True
581 return False
583 @classmethod
584 def get_simulation_setup_fields(cls):
585 """Return all fields in the chosen SimulationSetup class."""
586 return list(cls._sim_setup_class.__fields__.keys())
588 def save_for_reproduction(self,
589 title: str,
590 path: pathlib.Path = None,
591 files: list = None,
592 **kwargs):
593 """
594 Save the settings of the SimulationAPI in order to
595 reproduce the settings of the used simulation.
597 Should be extended by child-classes to allow custom
598 saving.
600 :param str title:
601 Title of the study
602 :param pathlib.Path path:
603 Where to store the .zip file. If not given, self.cd is used.
604 :param list files:
605 List of files to save along the standard ones.
606 Examples would be plots, tables etc.
607 :param dict kwargs:
608 All keyword arguments except title, files, and path of the function
609 `save_reproduction_archive`. Most importantly, `log_message` may be
610 specified to avoid input during execution.
611 """
612 if path is None:
613 path = self.cd
614 return save_reproduction_archive(
615 title=title,
616 path=path,
617 files=files,
618 **kwargs
619 )