Source code for uesgraphs.teaser_integration.simulate

"""Module to control multiprocessing of simulation of TEASER buildings.

You need to set a couple of environment variables for you conda environment to
use this module:

PYTHONPATH

"""
import os
import numpy as np
import multiprocessing

import logging

[docs]def set_up_terminal_logger(name: str, level: int = logging.INFO) -> logging.Logger: """ Set up a simple console-only logger for small functions. Args: name: Logger name level: Logging level (default: INFO) Returns: Configured console logger """ logger = logging.getLogger(name) # Avoid duplicate handlers if called multiple times if logger.handlers: return logger logger.setLevel(level) # Console handler only console_handler = logging.StreamHandler() formatter = logging.Formatter('%(name)s - %(levelname)s - %(message)s') console_handler.setFormatter(formatter) logger.addHandler(console_handler) # Prevent propagation to root logger to avoid double messages logger.propagate = False return logger
try: from dymola.dymola_interface import DymolaInterface except ImportError: DymolaInterface = None
[docs]class WorkerSimulation(multiprocessing.Process): """Helper class to enable simulation in a JoinableQueue. This class inherits from multiprocessing.Proccess. Pass over your simulation function and all parameters that are necessary for this function. Parameters ---------- sim_function : simulate() function sim_part : Django QuerySet, NumPy Array or any other iterable Iterable collection of BuildingEnergy objects prj : Scenario instance Scenario instance of the buildings that are simulated. Please ensure if you are using multiprocessing that all buildings are within the same prj. model_path : str Path where Modelica model from TEASER is located. This is TEASER output path. results_path : str Path where Dymola results should be stored. start_time : int Start time of the simulation stop_time : int Stop time of the simulation output_interval : int Output interval of the simulation solver : string Used solver in Dymola. All Dymola solvers are supported(default: 'Dassl' tolerance : float Tolerance of used solver process_number : int Counter of parallel processes, if multiprocessing is used. Default is None which indicates that no multuprocessing is used. Otherwise please set an individual number for every process that is used. result_queue : multiprocessing.JoinableQueue() JoinableQueue instance for each worker. """
[docs] def __init__( self, sim_function, sim_part, prj, model_path, results_path, start_time, stop_time, output_interval, method, tolerance, process_number, result_queue, aixlib_path, ): """Init function of WorkerSimulation.""" multiprocessing.Process.__init__(self) self.sim_function = sim_function self.sim_part = sim_part self.prj = prj self.model_path = model_path self.results_path = results_path self.start_time = start_time self.stop_time = stop_time self.output_interval = output_interval self.method = method self.tolerance = tolerance self.process_number = process_number self.result_queue = result_queue self.aixlib_path = aixlib_path
[docs] def run(self): """Run the specified simulation function with all parameters.""" res_tmp = self.sim_function( sim_part=self.sim_part, prj=self.prj, model_path=self.model_path, results_path=self.results_path, start_time=self.start_time, stop_time=self.stop_time, output_interval=self.output_interval, method=self.method, tolerance=self.tolerance, process_number=self.process_number, aixlib_path=self.aixlib_path, ) self.result_queue.put(res_tmp)
[docs]def simulate( sim_part, prj, model_path, results_path, start_time, stop_time, output_interval, method, tolerance, process_number, aixlib_path, ): """Simulate building models in serial using Dymola. This function simulates all buidlings given in the building_query_set in serial(one after another). This function can be used for parallel processing of simulation. There are no default values because it caused trouble with multiprocessing. Note: Please ensure that for all buildings corresponding models have been generated. Also ensure that all models are within the same prj, otherwise this will neither work for single nor for multiprocessing. Parameters ---------- sim_part :any iterable Iterable collection of Building objects prj : Scenario instance Scenario instance of the buildings that are simulated. Please ensure if you are using multiprocessing that all buildings are within the same prj. model_path : str Path where Modelica model from TEASER is located. This is TEASER output path. results_path : str Path where Dymola results should be stored. start_time : int Start time of the simulation stop_time : int Stop time of the simulation output_interval : int Output interval of the simulation solver : string Used solver in Dymola. All Dymola solvers are supported(default: 'Dassl' tolerance : float Tolerance of used solver process_number : int Counter of parallel processes, if multiprocessing is used. Default is None which indicates that no multuprocessing is used. Otherwise please set an individual number for every process that is used. """ logger =set_up_terminal_logger("simulate_worker_" + str(process_number)) SIMULATIONS_BEFORE_RESTART = 20 dir_result = os.path.join(results_path, prj.name) dir_temp = os.path.join(results_path, "temp" + str(process_number)) if not os.path.exists(dir_result): os.makedirs(dir_result) if not os.path.exists(dir_temp): os.makedirs(dir_temp) for count, bldg in enumerate(sim_part): if count % SIMULATIONS_BEFORE_RESTART == 0: try: dymola.close() except NameError: pass dymola = DymolaInterface() dymola.openModel( path=os.path.join(aixlib_path, "package.mo") ) dymola.openModel(os.path.join(model_path, prj.name, "package.mo")) dymola.cd(Dir=dir_temp) logger.info("simulate building {} in progres".format(bldg.name)) model_name = "%s.%s.%s" % (prj.name, bldg.name, bldg.name) dymola.translateModel(model_name) # Simulate the model output = dymola.simulateExtendedModel( problem=model_name, startTime=start_time, stopTime=stop_time, outputInterval=output_interval, method=method, tolerance=tolerance, resultFile=os.path.join(dir_result, bldg.name), ) if output[0] is False: logger.info("Simulation failed. Below is the translation log.") log = dymola.getLastError() logger.info(log) try: dymola.close() except NameError: pass
[docs]def queue_simulation( sim_function, prj, number_of_workers=multiprocessing.cpu_count() - 1, model_path=os.path.join(os.path.expanduser("~"), "TEASEROutput"), results_path=os.path.join(os.path.expanduser("~"), "djangoteaserout"), start_time=0.0, stop_time=31532400.0, output_interval=3600.0, method="Dassl", tolerance=0.0001, aixlib_path=None, ): """Simulate multiple buildings in a JoinableQueue. This function simulates all buidlings given in the building_query_set. It uses JoinableQueue for multiprocessing. Note: Please ensure that for all buildings corresponding models have been generated. Also ensure that all models are within the same prj, otherwise this will neither work for single nor for multiprocessing. Parameters ---------- sim_function : simulate() function prj : Scenario instance Scenario instance of the buildings that are simulated. Please ensure if you are using multiprocessing that all buildings are within the same prj. number_of_workers : int Number of workers used for the simulation. (default: number of physical processors - 1) model_path : str Path where Modelica model from TEASER is located. This is TEASER output path. results_path : str Path where Dymola results should be stored. start_time : int Start time of the simulation stop_time : int Stop time of the simulation output_interval : int Output interval of the simulation solver : string Used solver in Dymola. All Dymola solvers are supported(default: 'Dassl' tolerance : float Tolerance of used solver """ workers = [] list_office = [] list_other = [] sim_office = [] sim_other = [] simulation_parts = [] for bldg in prj.buildings: if type(bldg).__name__ == "Office": list_office.append(bldg) else: list_other.append(bldg) sim_office = np.array_split(list_office, number_of_workers) sim_other = np.array_split(list_other, number_of_workers) for i, j in enumerate(sim_office): simulation_parts.append(np.append(sim_office[i], sim_other[i])) result_queues = [multiprocessing.JoinableQueue()] * number_of_workers for i, (sim_part, result_queue) in enumerate(zip(simulation_parts, result_queues)): workers.append( WorkerSimulation( sim_function=sim_function, sim_part=sim_part, prj=prj, model_path=model_path, results_path=results_path, start_time=start_time, stop_time=stop_time, output_interval=output_interval, method=method, tolerance=tolerance, process_number=i, result_queue=result_queue, aixlib_path=aixlib_path, ) ) for w in workers: w.start() # Start worker for w in workers: w.join() # Block worker