Coverage for ebcpy/simulationapi/__init__.py: 79%
284 statements
« prev ^ index » next coverage.py v7.4.4, created at 2025-08-20 12:54 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2025-08-20 12:54 +0000
1"""
2Simulation APIs help you to perform automated
3simulations for energy and building climate related models.
4Parameters can easily be updated, and the initialization-process is
5much more user-friendly than the provided APIs by Dymola or fmpy.
6"""
7import pathlib
8import warnings
9import os
10import sys
11import itertools
12import time
13from pathlib import Path
14from datetime import timedelta
15from typing import Dict, Union, TypeVar, Any, List
16from abc import abstractmethod
17import multiprocessing as mp
19import pydantic
20from pydantic import BaseModel, Field, field_validator
21import numpy as np
22from ebcpy.utils import setup_logger
23from ebcpy.utils.reproduction import save_reproduction_archive
24from shutil import disk_usage
27class Variable(BaseModel):
28 """
29 Data-Class to store relevant information for a
30 simulation variable (input, parameter, output or local/state).
31 """
32 type: Any = Field(
33 default=None,
34 title='type',
35 description='Type of the variable'
36 )
37 value: Any = Field(
38 description="Default variable value"
39 )
40 max: Any = Field(
41 default=None,
42 title='max',
43 description='Maximal value (upper bound) of the variables value. '
44 'Only for ints and floats variables.'
45 )
46 min: Any = Field(
47 default=None,
48 title='min',
49 description='Minimal value (lower bound) of the variables value. '
50 'Only for ints and floats variables.'
51 )
53 @field_validator("value")
54 @classmethod
55 def check_value_type(cls, value, info: pydantic.FieldValidationInfo):
56 """Check if the given value has correct type"""
57 _type = info.data["type"]
58 if _type is None:
59 return value # No type -> no conversion
60 if value is None:
61 return value # Setting None is allowed.
62 if not isinstance(value, _type):
63 return _type(value)
64 return value
66 @field_validator('max', 'min')
67 @classmethod
68 def check_value(cls, value, info: pydantic.FieldValidationInfo):
69 """Check if the given bounds are correct."""
70 # Check if the variable type even allows for min/max bounds
71 _type = info.data["type"]
72 if _type is None:
73 return value # No type -> no conversion
74 if _type not in (float, int, bool):
75 if value is not None:
76 warnings.warn(
77 "Setting a min/max for variables "
78 f"of type {_type} is not supported."
79 )
80 return None
81 if value is not None:
82 return _type(value)
83 if info.field_name == "min":
84 return -np.inf if _type != bool else False
85 # else it is max
86 return np.inf if _type != bool else True
89class SimulationSetup(BaseModel):
90 """
91 pydantic BaseModel child to define relevant
92 parameters to setup the simulation.
93 """
94 start_time: float = Field(
95 default=0,
96 description="The start time of the simulation",
97 title="start_time"
98 )
99 stop_time: float = Field(
100 default=1,
101 description="The stop / end time of the simulation",
102 title="stop_time"
103 )
104 output_interval: float = Field(
105 default=1,
106 description="The step size of the simulation and "
107 "thus also output interval of results.",
108 title="output_interval"
109 )
110 fixedstepsize: float = Field(
111 title="fixedstepsize",
112 default=0.0,
113 description="Fixed step size for Euler"
114 )
115 solver: str = Field(
116 title="solver",
117 default="", # Is added in the field_validator
118 description="The solver to be used for numerical integration."
119 )
120 _default_solver: str = None
121 _allowed_solvers: list = []
123 @field_validator("solver")
124 @classmethod
125 def check_valid_solver(cls, solver):
126 """
127 Check if the solver is in the list of valid solvers
128 """
129 if not solver:
130 return cls.__private_attributes__['_default_solver'].default
131 allowed_solvers = cls.__private_attributes__['_allowed_solvers'].default
132 if solver not in allowed_solvers:
133 raise ValueError(f"Given solver '{solver}' is not supported! "
134 f"Supported are '{allowed_solvers}'")
135 return solver
137 class Config:
138 """Overwrite default pydantic Config"""
139 extra = 'forbid'
142SimulationSetupClass = TypeVar("SimulationSetupClass", bound=SimulationSetup)
145class SimulationAPI:
146 """Base-class for simulation apis. Every simulation-api class
147 must inherit from this class. It defines the structure of each class.
149 :param str,Path working_directory:
150 Working directory path
151 :param str model_name:
152 Name of the model being simulated.
153 :keyword int n_cpu:
154 Number of cores to be used by simulation.
155 If None is given, single core will be used.
156 Maximum number equals the cpu count of the device.
157 **Warning**: Logging is not yet fully working on multiple processes.
158 Output will be written to the stream handler, but not to the created
159 .log files.
160 :keyword bool save_logs: If logs should be stored.
162 """
163 _sim_setup_class: SimulationSetupClass = SimulationSetup
164 _items_to_drop = [
165 'pool',
166 ]
168 def __init__(self, working_directory: Union[Path, str], model_name: str,
169 **kwargs):
170 # Private helper attrs for multiprocessing
171 self._n_sim_counter = 0
172 self._n_sim_total = 0
173 self._progress_int = 0
174 # Handle deprecation warning
175 self._working_directory = None # Define instance attribute
176 self.working_directory = Path(working_directory).absolute()
177 save_logs = kwargs.get("save_logs", True)
178 self.logger = setup_logger(
179 working_directory=self.working_directory if save_logs else None,
180 name=self.__class__.__name__
181 )
182 # Setup the logger
183 self.logger.info(f'{"-" * 25}Initializing class {self.__class__.__name__}{"-" * 25}')
184 # Check multiprocessing
185 self.n_cpu = kwargs.get("n_cpu", 1)
186 if self.n_cpu > mp.cpu_count():
187 raise ValueError(f"Given n_cpu '{self.n_cpu}' is greater "
188 "than the available number of "
189 f"cpus on your machine '{mp.cpu_count()}'")
190 if self.n_cpu > 1:
191 # pylint: disable=consider-using-with
192 self.pool = mp.Pool(processes=self.n_cpu)
193 self.use_mp = True
194 else:
195 self.pool = None
196 self.use_mp = False
197 # Setup the model
198 self._sim_setup = self._sim_setup_class()
199 self.inputs: Dict[str, Variable] = {} # Inputs of model
200 self.outputs: Dict[str, Variable] = {} # Outputs of model
201 self.parameters: Dict[str, Variable] = {} # Parameter of model
202 self.states: Dict[str, Variable] = {} # States of model
203 self.result_names = []
204 self._model_name = None
205 self.model_name = model_name
207 # MP-Functions
208 @property
209 def worker_idx(self):
210 """Index of the current worker"""
211 _id = mp.current_process()._identity
212 if _id:
213 return _id[0]
214 return None
216 def __getstate__(self):
217 """Overwrite magic method to allow pickling the api object"""
218 self_dict = self.__dict__.copy()
219 for item in self._items_to_drop:
220 del self_dict[item]
221 return self_dict
223 def __setstate__(self, state):
224 """Overwrite magic method to allow pickling the api object"""
225 self.__dict__.update(state)
227 def close(self):
228 """Base function for closing the simulation-program."""
229 if self.use_mp:
230 try:
231 self.pool.map(self._close_multiprocessing,
232 list(range(self.n_cpu)))
233 self.pool.close()
234 self.pool.join()
235 except ValueError:
236 pass # Already closed prior to atexit
238 @abstractmethod
239 def _close_multiprocessing(self, _):
240 raise NotImplementedError(f'{self.__class__.__name__}.close '
241 f'function is not defined')
243 @abstractmethod
244 def _single_close(self, **kwargs):
245 """Base function for closing the simulation-program of a single core"""
246 raise NotImplementedError(f'{self.__class__.__name__}._single_close '
247 f'function is not defined')
249 @abstractmethod
250 def simulate(self,
251 parameters: Union[dict, List[dict]] = None,
252 return_option: str = "time_series",
253 **kwargs):
254 """
255 Base function for simulating the simulation-model.
257 :param dict parameters:
258 Parameters to simulate.
259 Names of parameters are key, values are value of the dict.
260 It is also possible to specify a list of multiple parameter
261 dicts for different parameter variations to be simulated.
262 Default is an empty dict.
263 :param str return_option:
264 How to handle the simulation results. Options are:
265 - 'time_series': Returns a DataFrame with the results and does not store anything.
266 Only variables specified in result_names will be returned.
267 - 'last_point': Returns only the last point of the simulation.
268 Relevant for integral metrics like energy consumption.
269 Only variables specified in result_names will be returned.
270 - 'savepath': Returns the savepath where the results are stored.
271 Depending on the API, different kwargs may be used to specify file type etc.
272 :keyword str,Path savepath:
273 If path is provided, the relevant simulation results will be saved
274 in the given directory. For multiple parameter variations also a list
275 of savepaths for each parameterset can be specified.
276 The savepaths for each parameter set must be unique.
277 Only relevant if return_option equals 'savepath'.
278 Default is the current working directory.
279 :keyword str result_file_name:
280 Name of the result file. Default is 'resultFile'.
281 For multiple parameter variations a list of names
282 for each result must be specified.
283 Only relevant if return_option equals 'savepath'.
284 :keyword (TimeSeriesData, pd.DataFrame) inputs:
285 Pandas.Dataframe of the input data for simulating the FMU with fmpy
286 :keyword Boolean fail_on_error:
287 If True, an error in fmpy will trigger an error in this script.
288 Default is True
290 :return: str,os.path.normpath filepath:
291 Only if return_option equals 'savepath'.
292 Filepath of the result file.
293 :return: dict:
294 Only if return_option equals 'last_point'.
295 :return: Union[List[pd.DataFrame],pd.DataFrame]:
296 If parameters are scalar and squeeze=True,
297 a DataFrame with the columns being equal to
298 self.result_names.
299 If multiple set's of initial values are given, one
300 dataframe for each set is returned in a list
301 """
302 # Convert inputs to equally sized objects of lists:
303 if parameters is None:
304 parameters = [{}]
305 if isinstance(parameters, dict):
306 parameters = [parameters]
308 if return_option not in ["time_series", "savepath", "last_point"]:
309 raise ValueError(f"Given return option '{return_option}' is not supported.")
311 new_kwargs = {}
312 kwargs["return_option"] = return_option # Update with arg
313 n_simulations = len(parameters)
314 # Handle special case for saving files:
315 if return_option == "savepath" and n_simulations > 1:
316 savepath = kwargs.get("savepath", self.working_directory)
317 if isinstance(savepath, (str, os.PathLike, Path)):
318 savepath = [savepath] * n_simulations
319 result_file_name = kwargs.get("result_file_name", [])
320 if isinstance(result_file_name, str):
321 result_file_name = [result_file_name] * n_simulations
322 if len(savepath) != len(result_file_name):
323 raise ValueError("Given savepath and result_file_name "
324 "have not the same length.")
325 joined_save_paths = []
326 for _single_save_path, _single_result_name in zip(savepath, result_file_name):
327 joined_save_paths.append(os.path.join(_single_save_path, _single_result_name))
328 if len(set(joined_save_paths)) != n_simulations:
329 raise ValueError(
330 "Simulating multiple parameter set's on "
331 "the same combination of savepath and result_file_name "
332 "will override results or even cause errors. "
333 "Specify a unique result_file_name-savepath combination "
334 "for each parameter combination"
335 )
336 for key, value in kwargs.items():
337 if isinstance(value, list):
338 if len(value) != n_simulations:
339 raise ValueError(f"Mismatch in multiprocessing of "
340 f"given parameters ({n_simulations}) "
341 f"and given {key} ({len(value)})")
342 new_kwargs[key] = value
343 else:
344 new_kwargs[key] = [value] * n_simulations
345 kwargs = []
346 for _idx, _parameters in enumerate(parameters):
347 kwargs.append(
348 {"parameters": _parameters,
349 **{key: value[_idx] for key, value in new_kwargs.items()}
350 }
351 )
352 # Decide between mp and single core
353 t_sim_start = time.time()
354 if self.use_mp:
355 self._n_sim_counter = 0
356 self._n_sim_total = len(kwargs)
357 self._progress_int = 0
358 self.logger.info("Starting %s simulations on %s cores",
359 self._n_sim_total, self.n_cpu)
360 results = []
361 for result in self.pool.imap(self._single_simulation, kwargs):
362 results.append(result)
363 self._n_sim_counter += 1
364 # Assuming that all worker start and finish their first simulation
365 # at the same time, so that the time estimation begins after
366 # n_cpu simulations. Otherwise, the translation and start process
367 # could falsify the time estimation.
368 if self._n_sim_counter == self.n_cpu:
369 t1 = time.time()
370 if self._n_sim_counter > self.n_cpu:
371 self._remaining_time(t1)
372 if self._n_sim_counter == 1 and return_option == 'savepath':
373 self._check_disk_space(result)
374 sys.stderr.write("\r")
375 else:
376 results = [self._single_simulation(kwargs={
377 "parameters": _single_kwargs["parameters"],
378 "return_option": _single_kwargs["return_option"],
379 **_single_kwargs
380 }) for _single_kwargs in kwargs]
381 self.logger.info(f"Finished {n_simulations} simulations on {self.n_cpu} processes in "
382 f"{timedelta(seconds=int(time.time() - t_sim_start))}")
383 if len(results) == 1:
384 return results[0]
385 return results
387 def _remaining_time(self, t1):
388 """
389 Helper function to calculate the remaining simulation time and log the finished simulations.
390 The function can first be used when a simulation has finished on each used cpu, so that the
391 translation of the model is not considered in the time estimation.
393 :param float t1:
394 Start time after n_cpu simulations.
395 """
396 t_remaining = (time.time() - t1) / (self._n_sim_counter - self.n_cpu) * (
397 self._n_sim_total - self._n_sim_counter)
398 p_finished = self._n_sim_counter / self._n_sim_total * 100
399 sys.stderr.write(f"\rFinished {np.round(p_finished, 1)} %. "
400 f"Approximately remaining time: {timedelta(seconds=int(t_remaining))} ")
402 def _check_disk_space(self, filepath):
403 """
404 Checks how much disk space all simulations will need on a hard drive
405 and throws a warning when less than 5 % would be free on the hard drive
406 after all simulations.
407 Works only for multiprocessing.
408 """
410 def convert_bytes(size):
411 suffixes = ['B', 'KB', 'MB', 'GB', 'TB']
412 suffix_idx = 0
413 while size >= 1024 and suffix_idx < len(suffixes):
414 suffix_idx += 1
415 size = size / 1024.0
416 return f'{str(np.round(size, 2))} {suffixes[suffix_idx]}'
418 if not isinstance(filepath, (Path, str)) or not os.path.exists(filepath):
419 self.logger.info(
420 "Can't check disk usage as you probably used postprocessing on simulation "
421 "results but did not return a file-path in the post-processing function"
422 )
423 return
425 sim_file_size = os.stat(filepath).st_size
426 sim_files_size = sim_file_size * self._n_sim_total
427 self.logger.info(f"Simulations files need approximately {convert_bytes(sim_files_size)} of disk space")
428 total, used, free = disk_usage(filepath)
429 if sim_files_size > free - 0.05 * total:
430 warnings.warn(f"{convert_bytes(free)} of free disk space on {filepath[:2]} "
431 f"is not enough for all simulation files.")
433 @abstractmethod
434 def _single_simulation(self, kwargs):
435 """
436 Same arguments and function as simulate().
437 Used to differ between single- and multi-processing simulation"""
438 raise NotImplementedError(f'{self.__class__.__name__}._single_simulation '
439 f'function is not defined')
441 @property
442 def sim_setup(self) -> SimulationSetupClass:
443 """Return current sim_setup"""
444 return self._sim_setup
446 @sim_setup.deleter
447 def sim_setup(self):
448 """In case user deletes the object, reset it to the default one."""
449 self._sim_setup = self._sim_setup_class()
451 def set_sim_setup(self, sim_setup):
452 """
453 Replaced in v0.1.7 by property function
454 """
455 new_setup = self._sim_setup.model_dump()
456 new_setup.update(sim_setup)
457 self._sim_setup = self._sim_setup_class(**new_setup)
459 @property
460 def model_name(self) -> str:
461 """Name of the model being simulated"""
462 return self._model_name
464 @model_name.setter
465 def model_name(self, model_name: str):
466 """
467 Set new model_name and trigger further functions
468 to load parameters etc.
469 """
470 # Only update if the model_name actually changes
471 if self._model_name == model_name:
472 return
473 self._model_name = model_name
474 # Only update model if it's the first setup. On multiprocessing,
475 # all objects are duplicated and thus this setter is triggered again.
476 # This if statement catches this case.
477 if self.worker_idx and self.use_mp:
478 return
479 # Empty all variables again.
480 self._update_model_variables()
482 def _update_model_variables(self):
483 """
484 Function to empty all variables and update them again
485 """
486 self.outputs = {}
487 self.parameters = {}
488 self.states = {}
489 self.inputs = {}
490 self._update_model()
491 # Set all outputs to result_names:
492 self.result_names = list(self.outputs.keys())
494 @abstractmethod
495 def _update_model(self):
496 """
497 Reimplement this to change variables etc.
498 based on the new model.
499 """
500 raise NotImplementedError(f'{self.__class__.__name__}._update_model '
501 f'function is not defined')
503 def set_working_directory(self, working_directory: Union[Path, str]):
504 """Base function for changing the current working directory."""
505 self.working_directory = working_directory
507 @property
508 def working_directory(self) -> Path:
509 """Get the current working directory"""
510 return self._working_directory
512 @working_directory.setter
513 def working_directory(self, working_directory: Union[Path, str]):
514 """Set the current working directory"""
515 if isinstance(working_directory, str):
516 working_directory = Path(working_directory)
517 os.makedirs(working_directory, exist_ok=True)
518 self._working_directory = working_directory
520 def set_cd(self, cd: Union[Path, str]):
521 warnings.warn("cd was renamed to working_directory in all classes. "
522 "Use working_directory instead instead.", category=DeprecationWarning)
523 self.working_directory = cd
525 @property
526 def cd(self) -> Path:
527 warnings.warn("cd was renamed to working_directory in all classes. "
528 "Use working_directory instead instead.", category=DeprecationWarning)
529 return self.working_directory
531 @cd.setter
532 def cd(self, cd: Union[Path, str]):
533 warnings.warn("cd was renamed to working_directory in all classes. "
534 "Use working_directory instead instead.", category=DeprecationWarning)
535 self.working_directory = cd
537 @property
538 def result_names(self) -> List[str]:
539 """
540 The variables names which to store in results.
542 Returns:
543 list: List of string where the string is the
544 name of the variable to store in the result.
545 """
546 return self._result_names
548 @result_names.setter
549 def result_names(self, result_names):
550 """
551 Set the result names. If the name is not supported,
552 an error is logged.
553 """
554 self.check_unsupported_variables(variables=result_names,
555 type_of_var="variables")
556 self._result_names = result_names
558 @property
559 def variables(self):
560 """
561 All variables of the simulation model
562 """
563 return list(itertools.chain(self.parameters.keys(),
564 self.outputs.keys(),
565 self.inputs.keys(),
566 self.states.keys()))
568 def check_unsupported_variables(self, variables: List[str], type_of_var: str):
569 """Log warnings if variables are not supported."""
570 if type_of_var == "parameters":
571 ref = self.parameters.keys()
572 elif type_of_var == "outputs":
573 ref = self.outputs.keys()
574 elif type_of_var == "inputs":
575 ref = self.inputs.keys()
576 elif type_of_var == "inputs":
577 ref = self.states.keys()
578 else:
579 ref = self.variables
581 diff = set(variables).difference(ref)
582 if diff:
583 self.logger.warning(
584 "Variables '%s' not found in model '%s'. "
585 "Will most probably trigger an error when simulating.",
586 ', '.join(diff), self.model_name
587 )
588 return True
589 return False
591 @classmethod
592 def get_simulation_setup_fields(cls):
593 """Return all fields in the chosen SimulationSetup class."""
594 return list(cls._sim_setup_class.__fields__.keys())
596 def save_for_reproduction(self,
597 title: str,
598 path: pathlib.Path = None,
599 files: list = None,
600 **kwargs):
601 """
602 Save the settings of the SimulationAPI in order to
603 reproduce the settings of the used simulation.
605 Should be extended by child-classes to allow custom
606 saving.
608 :param str title:
609 Title of the study
610 :param pathlib.Path path:
611 Where to store the .zip file. If not given, self.cd is used.
612 :param list files:
613 List of files to save along the standard ones.
614 Examples would be plots, tables etc.
615 :param dict kwargs:
616 All keyword arguments except title, files, and path of the function
617 `save_reproduction_archive`. Most importantly, `log_message` may be
618 specified to avoid input during execution.
619 """
620 if path is None:
621 path = self.cd
622 return save_reproduction_archive(
623 title=title,
624 path=path,
625 files=files,
626 **kwargs
627 )