Coverage for ebcpy/simulationapi/dymola_api.py: 64%
602 statements
« prev ^ index » next coverage.py v7.4.4, created at 2025-01-04 08:02 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2025-01-04 08:02 +0000
1"""Module containing the DymolaAPI used for simulation
2of Modelica-Models."""
4import sys
5import os
6import shutil
7import uuid
8import warnings
9import atexit
10import json
11import time
12import socket
13from pathlib import Path
14from contextlib import closing
15from typing import Union, List
17from pydantic import Field, BaseModel
18import pandas as pd
20from ebcpy import TimeSeriesData
21from ebcpy.modelica import manipulate_ds
22from ebcpy.simulationapi import SimulationSetup, SimulationAPI, \
23 SimulationSetupClass, Variable
24from ebcpy.utils.conversion import convert_tsd_to_modelica_txt
27class DymolaSimulationSetup(SimulationSetup):
28 """
29 Adds ``tolerance`` to the list of possible
30 setup fields.
31 """
32 tolerance: float = Field(
33 title="tolerance",
34 default=0.0001,
35 description="Tolerance of integration"
36 )
38 _default_solver = "Dassl"
39 _allowed_solvers = ["Dassl", "Euler", "Cerk23", "Cerk34", "Cerk45",
40 "Esdirk23a", "Esdirk34a", "Esdirk45a", "Cvode",
41 "Rkfix2", "Rkfix3", "Rkfix4", "Lsodar",
42 "Radau", "Dopri45", "Dopri853", "Sdirk34hw"]
45class ExperimentSetupOutput(BaseModel):
46 """
47 Experiment setup output data model with
48 defaults equal to those in Dymola
49 """
50 states: bool = True
51 derivatives: bool = True
52 inputs: bool = True
53 outputs: bool = True
54 auxiliaries: bool = True
55 equidistant: bool = False
56 events: bool = True
58 class Config:
59 """
60 Pydantic internal model settings
61 """
62 # pylint: disable=too-few-public-methods
63 extra = "forbid"
66class DymolaAPI(SimulationAPI):
67 """
68 API to a Dymola instance.
70 :param str,Path working_directory:
71 Dirpath for the current working directory of dymola
72 :param str model_name:
73 Name of the model to be simulated.
74 If None, it has to be provided prior to or when calling simulate().
75 :param list packages:
76 List with path's to the packages needed to simulate the model
77 :keyword Boolean show_window:
78 True to show the Dymola window. Default is False
79 :keyword Boolean modify_structural_parameters:
80 True to automatically set the structural parameters of the
81 simulation model via Modelica modifiers. Default is True.
82 See also the keyword ``structural_parameters``
83 of the ``simulate`` function.
84 :keyword Boolean equidistant_output:
85 If True (Default), Dymola stores variables in an
86 equisdistant output and does not store variables at events.
87 :keyword dict[str,bool] variables_to_save:
88 A dictionary to select which variables are going
89 to be stored if the simulation creates .mat files.
90 Options (with the default being all True):
91 - states=True
92 - derivatives=True
93 - inputs=True
94 - outputs=True
95 - auxiliaries=False
96 :keyword int n_restart:
97 Number of iterations after which Dymola should restart.
98 This is done to free memory. Default value -1. For values
99 below 1 Dymola does not restart.
100 :keyword bool extract_variables:
101 If True (the default), all variables of the model will be extracted
102 on init of this class.
103 This required translating the model.
104 :keyword bool debug:
105 If True (not the default), the dymola instance is not closed
106 on exit of the python script. This allows further debugging in
107 dymola itself if API-functions cause a python error.
108 :keyword str mos_script_pre:
109 Path to a valid mos-script for Modelica/Dymola.
110 If given, the script is executed **prior** to laoding any
111 package specified in this API.
112 May be relevant for handling version conflicts.
113 :keyword str mos_script_post:
114 Path to a valid mos-script for Modelica/Dymola.
115 If given, the script is executed before closing Dymola.
116 :keyword str dymola_version:
117 Version of Dymola to use.
118 If not given, newest version will be used.
119 If given, the Version needs to be equal to the folder name
120 of your installation.
122 **Example:** If you have two versions installed at
124 - ``C://Program Files//Dymola 2021`` and
125 - ``C://Program Files//Dymola 2020x``
127 and you want to use Dymola 2020x, specify
128 ``dymola_version='Dymola 2020x'``.
130 This parameter is overwritten if ``dymola_path`` is specified.
131 :keyword str dymola_path:
132 Path to the dymola installation on the device. Necessary
133 e.g. on linux, if we can't find the path automatically.
134 Example: ``dymola_path="C://Program Files//Dymola 2020x"``
135 :keyword str dymola_interface_path:
136 Direct path to the .egg-file of the dymola interface.
137 Only relevant when the dymola_path
138 differs from the interface path.
139 :keyword str dymola_exe_path:
140 Direct path to the dymola executable.
141 Only relevant if the dymola installation do not follow
142 the official guideline.
143 :keyword float time_delay_between_starts:
144 If starting multiple Dymola instances on multiple
145 cores, a time delay between each start avoids weird
146 behaviour, such as requiring to set the C-Compiler again
147 as Dymola overrides the default .dymx setup file.
148 If you start e.g. 20 instances and specify `time_delay_between_starts=5`,
149 each 5 seconds one instance will start, taking in total
150 100 seconds. Default is no delay.
152 Example:
154 >>> import os
155 >>> from ebcpy import DymolaAPI
156 >>> # Specify the model name
157 >>> model_name = "Modelica.Thermal.FluidHeatFlow.Examples.PumpAndValve"
158 >>> dym_api = DymolaAPI(working_directory=os.getcwd(),
159 >>> model_name=model_name,
160 >>> packages=[],
161 >>> show_window=True)
162 >>> dym_api.sim_setup = {"start_time": 100,
163 >>> "stop_time": 200}
164 >>> dym_api.simulate()
165 >>> dym_api.close()
167 """
168 _sim_setup_class: SimulationSetupClass = DymolaSimulationSetup
169 _items_to_drop = ["pool", "dymola", "_dummy_dymola_instance"]
170 dymola = None
171 # Default simulation setup
172 _supported_kwargs = [
173 "show_window",
174 "modify_structural_parameters",
175 "dymola_path",
176 "equidistant_output",
177 "variables_to_save",
178 "n_restart",
179 "debug",
180 "mos_script_pre",
181 "mos_script_post",
182 "dymola_version",
183 "dymola_interface_path",
184 "dymola_exe_path",
185 "time_delay_between_starts"
186 ]
188 def __init__(
189 self,
190 working_directory: Union[Path, str],
191 model_name: str = None,
192 packages: List[Union[Path, str]] = None,
193 **kwargs
194 ):
195 """Instantiate class objects."""
196 self.dymola = None # Avoid key-error in get-state. Instance attribute needs to be there.
197 # Update kwargs with regard to what kwargs are supported.
198 self.extract_variables = kwargs.pop("extract_variables", True)
199 self.fully_initialized = False
200 self.debug = kwargs.pop("debug", False)
201 self.show_window = kwargs.pop("show_window", False)
202 self.modify_structural_parameters = kwargs.pop("modify_structural_parameters", True)
203 self.equidistant_output = kwargs.pop("equidistant_output", True)
204 _variables_to_save = kwargs.pop("variables_to_save", {})
205 self.experiment_setup_output = ExperimentSetupOutput(**_variables_to_save)
207 self.mos_script_pre = kwargs.pop("mos_script_pre", None)
208 self.mos_script_post = kwargs.pop("mos_script_post", None)
209 self.dymola_version = kwargs.pop("dymola_version", None)
210 self.dymola_interface_path = kwargs.pop("dymola_interface_path", None)
211 self.dymola_exe_path = kwargs.pop("dymola_exe_path", None)
212 _time_delay_between_starts = kwargs.pop("time_delay_between_starts", 0)
213 for mos_script in [self.mos_script_pre, self.mos_script_post]:
214 if mos_script is not None:
215 if not os.path.isfile(mos_script):
216 raise FileNotFoundError(
217 f"Given mos_script '{mos_script}' does "
218 f"not exist."
219 )
220 if not str(mos_script).endswith(".mos"):
221 raise TypeError(
222 f"Given mos_script '{mos_script}' "
223 f"is not a valid .mos file."
224 )
226 # Convert to modelica path
227 if self.mos_script_pre is not None:
228 self.mos_script_pre = self._make_modelica_normpath(self.mos_script_pre)
229 if self.mos_script_post is not None:
230 self.mos_script_post = self._make_modelica_normpath(self.mos_script_post)
232 super().__init__(working_directory=working_directory,
233 model_name=model_name,
234 n_cpu=kwargs.pop("n_cpu", 1),
235 save_logs=kwargs.pop("save_logs", True))
237 # First import the dymola-interface
238 dymola_path = kwargs.pop("dymola_path", None)
239 if dymola_path is not None:
240 if not os.path.exists(dymola_path):
241 raise FileNotFoundError(f"Given path '{dymola_path}' can not be found on "
242 "your machine.")
243 else:
244 # Get the dymola-install-path:
245 _dym_installations = self.get_dymola_install_paths()
246 if _dym_installations:
247 if self.dymola_version:
248 dymola_path = _get_dymola_path_of_version(
249 dymola_installations=_dym_installations,
250 dymola_version=self.dymola_version
251 )
252 else:
253 dymola_path = _dym_installations[0] # 0 is the newest
254 self.logger.info("Using dymola installation at %s", dymola_path)
255 else:
256 if self.dymola_exe_path is None or self.dymola_interface_path is None:
257 raise FileNotFoundError(
258 "Could not find dymola on your machine. "
259 "Thus, not able to find the `dymola_exe_path` and `dymola_interface_path`. "
260 "Either specify both or pass an existing `dymola_path`."
261 )
262 self.dymola_path = dymola_path
263 if self.dymola_exe_path is None:
264 self.dymola_exe_path = self.get_dymola_exe_path(dymola_path)
265 self.logger.info("Using dymola.exe: %s", self.dymola_exe_path)
266 if self.dymola_interface_path is None:
267 self.dymola_interface_path = self.get_dymola_interface_path(dymola_path)
268 self.logger.info("Using dymola interface: %s", self.dymola_interface_path)
270 self.packages = []
271 if packages is not None:
272 for package in packages:
273 if isinstance(package, Path):
274 self.packages.append(str(package))
275 elif isinstance(package, str):
276 self.packages.append(package)
277 else:
278 raise TypeError(f"Given package is of type {type(package)}"
279 f" but should be any valid path.")
281 # Import n_restart
282 self.sim_counter = 0
283 self.n_restart = kwargs.pop("n_restart", -1)
284 if not isinstance(self.n_restart, int):
285 raise TypeError(f"n_restart has to be type int but "
286 f"is of type {type(self.n_restart)}")
288 self._dummy_dymola_instance = None # Ensure self._close_dummy gets the attribute.
289 if self.n_restart > 0:
290 self.logger.info("Open blank placeholder Dymola instance to ensure"
291 " a licence during Dymola restarts")
292 # Use standard port allocation, should always work
293 self._dummy_dymola_instance = self._open_dymola_interface(port=-1)
294 atexit.register(self._close_dummy)
296 # List storing structural parameters for later modifying the simulation-name.
297 # Parameter for raising a warning if to many dymola-instances are running
298 self._critical_number_instances = 10 + self.n_cpu
299 # Register the function now in case of an error.
300 if not self.debug:
301 atexit.register(self.close)
302 if self.use_mp:
303 ports = _get_n_available_ports(n_ports=self.n_cpu)
304 self.pool.map(
305 self._setup_dymola_interface,
306 [dict(use_mp=True, port=port, time_delay=i * _time_delay_between_starts)
307 for i, port in enumerate(ports)]
308 )
309 # For translation etc. always setup a default dymola instance
310 self.dymola = self._setup_dymola_interface(dict(use_mp=False))
311 if not self.license_is_available():
312 warnings.warn("You have no licence to use Dymola. "
313 "Hence you can only simulate models with 8 or less equations.")
314 # Update experiment setup output
315 self.update_experiment_setup_output(self.experiment_setup_output)
316 self.fully_initialized = True
317 # Trigger on init.
318 if model_name is not None:
319 self._update_model()
320 # Set result_names to output variables.
321 self.result_names = list(self.outputs.keys())
323 # Check if some kwargs are still present. If so, inform the user about
324 # false usage of kwargs:
325 if kwargs:
326 self.logger.error(
327 "You passed the following kwargs which "
328 "are not part of the supported kwargs and "
329 "have thus no effect: %s.", " ,".join(list(kwargs.keys())))
331 def _update_model(self):
332 # Translate the model and extract all variables,
333 # if the user wants to:
334 if self.extract_variables and self.fully_initialized:
335 self.extract_model_variables()
337 def simulate(self,
338 parameters: Union[dict, List[dict]] = None,
339 return_option: str = "time_series",
340 **kwargs):
341 """
342 Simulate the given parameters.
344 Additional settings:
346 :keyword List[str] model_names:
347 List of Dymola model-names to simulate. Should be either the size
348 of parameters or parameters needs to be sized 1.
349 Keep in mind that different models may use different parameters!
350 :keyword Boolean show_eventlog:
351 Default False. True to show evenlog of simulation (advanced)
352 :keyword Boolean squeeze:
353 Default True. If only one set of initialValues is provided,
354 a DataFrame is returned directly instead of a list.
355 :keyword str table_name:
356 If inputs are given, you have to specify the name of the table
357 in the instance of CombiTimeTable. In order for the inputs to
358 work the value should be equal to the value of 'tableName' in Modelica.
359 :keyword str file_name:
360 If inputs are given, you have to specify the file_name of the table
361 in the instance of CombiTimeTable. In order for the inputs to
362 work the value should be equal to the value of 'fileName' in Modelica.
363 :keyword callable postprocess_mat_result:
364 When choosing return_option savepath and no equidistant output, the mat files may take up
365 a lot of disk space while you are only interested in some variables or parts
366 of the simulation results. This features enables you to pass any function which
367 gets the mat-path as an input and returns some result you are interested in.
368 The function signature is `foo(mat_result_file, **kwargs_postprocessing) -> Any`.
369 Be sure to define the function in a global scope to allow multiprocessing.
370 :keyword dict kwargs_postprocessing:
371 Keyword arguments used in the function `postprocess_mat_result`.
372 :keyword List[str] structural_parameters:
373 A list containing all parameter names which are structural in Modelica.
374 This means a modifier has to be created in order to change
375 the value of this parameter. Internally, the given list
376 is added to the known states of the model. Hence, you only have to
377 specify this keyword argument if your structural parameter
378 does not appear in the dsin.txt file created during translation.
380 Example:
381 Changing a record in a model:
383 >>> sim_api.simulate(
384 >>> parameters={"parameterPipe": "AixLib.DataBase.Pipes.PE_X.DIN_16893_SDR11_d160()"},
385 >>> structural_parameters=["parameterPipe"])
387 """
388 # Handle special case for structural_parameters
389 if "structural_parameters" in kwargs:
390 _struc_params = kwargs["structural_parameters"]
391 # Check if input is 2-dimensional for multiprocessing.
392 # If not, make it 2-dimensional to avoid list flattening in
393 # the super method.
394 if not isinstance(_struc_params[0], list):
395 kwargs["structural_parameters"] = [_struc_params]
396 if "model_names" in kwargs:
397 model_names = kwargs["model_names"]
398 if not isinstance(model_names, list):
399 raise TypeError("model_names needs to be a list.")
400 if isinstance(parameters, dict):
401 # Make an array of parameters to enable correct use of super function.
402 parameters = [parameters] * len(model_names)
403 if parameters is None:
404 parameters = [{}] * len(model_names)
405 return super().simulate(parameters=parameters, return_option=return_option, **kwargs)
407 def _single_simulation(self, kwargs):
408 # Unpack kwargs
409 show_eventlog = kwargs.pop("show_eventlog", False)
410 squeeze = kwargs.pop("squeeze", True)
411 result_file_name = kwargs.pop("result_file_name", 'resultFile')
412 parameters = kwargs.pop("parameters")
413 return_option = kwargs.pop("return_option")
414 model_names = kwargs.pop("model_names", None)
415 inputs = kwargs.pop("inputs", None)
416 fail_on_error = kwargs.pop("fail_on_error", True)
417 structural_parameters = kwargs.pop("structural_parameters", [])
418 table_name = kwargs.pop("table_name", None)
419 file_name = kwargs.pop("file_name", None)
420 savepath = kwargs.pop("savepath", None)
422 def empty_postprocessing(mat_result, **_kwargs):
423 return mat_result
425 postprocess_mat_result = kwargs.pop("postprocess_mat_result", empty_postprocessing)
426 kwargs_postprocessing = kwargs.pop("kwargs_postprocessing", {})
427 if kwargs:
428 self.logger.error(
429 "You passed the following kwargs which "
430 "are not part of the supported kwargs and "
431 "have thus no effect: %s.", " ,".join(list(kwargs.keys())))
433 # Handle multiprocessing
434 if self.use_mp:
435 idx_worker = self.worker_idx
436 if self.dymola is None:
437 # This should not affect #119, as this rarely happens. Thus, the
438 # method used in the DymolaInterface should work.
439 self._setup_dymola_interface(dict(use_mp=True))
441 # Re-set the dymola experiment output if API is newly started
442 self.dymola.experimentSetupOutput(**self.experiment_setup_output.model_dump())
444 # Handle eventlog
445 if show_eventlog:
446 if not self.experiment_setup_output.events:
447 raise ValueError("You can't log events and have an "
448 "equidistant output, set equidistant output=False")
449 self.dymola.ExecuteCommand("Advanced.Debug.LogEvents = true")
450 self.dymola.ExecuteCommand("Advanced.Debug.LogEventsInitialization = true")
452 # Restart Dymola after n_restart iterations
453 self._check_restart()
455 # Handle custom model_names
456 if model_names is not None:
457 # Custom model_name setting
458 _res_names = self.result_names.copy()
459 self._model_name = model_names
460 self._update_model_variables()
461 if _res_names != self.result_names:
462 self.logger.info(
463 "Result names changed due to setting the new model. "
464 "If you do not expect custom result names, ignore this warning."
465 "If you do expect them, please raise an issue to add the "
466 "option when using the model_names keyword.")
467 self.logger.info(
468 "Difference: %s",
469 " ,".join(list(set(_res_names).difference(self.result_names)))
470 )
472 if self.model_name is None:
473 raise ValueError(
474 "You neither passed a model_name when "
475 "starting DymolaAPI, nor when calling simulate. "
476 "Can't simulate no model."
477 )
479 # Handle parameters:
480 if parameters is None:
481 parameters = {}
482 unsupported_parameters = False
483 else:
484 unsupported_parameters = self.check_unsupported_variables(
485 variables=list(parameters.keys()),
486 type_of_var="parameters"
487 )
489 # Handle structural parameters
491 if (unsupported_parameters and
492 (self.modify_structural_parameters or
493 structural_parameters)):
494 # Alter the model_name for the next simulation
495 model_name, parameters_new = self._alter_model_name(
496 parameters=parameters,
497 model_name=self.model_name,
498 structural_params=list(self.states.keys()) + structural_parameters
499 )
500 # Trigger translation only if something changed
501 if model_name != self.model_name:
502 _res_names = self.result_names.copy()
503 self.model_name = model_name
504 self.result_names = _res_names # Restore previous result names
505 self.logger.warning(
506 "Warning: Currently, the model is re-translating "
507 "for each simulation. You should add to your Modelica "
508 "parameters \"annotation(Evaluate=false)\".\n "
509 "Check for these parameters: %s",
510 ', '.join(set(parameters.keys()).difference(parameters_new.keys()))
511 )
512 parameters = parameters_new
513 # Check again
514 unsupported_parameters = self.check_unsupported_variables(
515 variables=list(parameters.keys()),
516 type_of_var="parameters"
517 )
519 initial_names = list(parameters.keys())
520 initial_values = list(parameters.values())
521 # Convert to float for Boolean and integer types:
522 try:
523 initial_values = [float(v) for v in initial_values]
524 except (ValueError, TypeError) as err:
525 raise TypeError("Dymola only accepts float values. "
526 "Could bot automatically convert the given "
527 "parameter values to float.") from err
529 # Handle inputs
530 if inputs is not None:
531 # Unpack additional kwargs
532 if table_name is None or file_name is None:
533 raise KeyError("For inputs to be used by DymolaAPI.simulate, you "
534 "have to specify the 'table_name' and the 'file_name' "
535 "as keyword arguments of the function. These must match"
536 "the values 'tableName' and 'fileName' in the CombiTimeTable"
537 " model in your modelica code.") from err
538 # Generate the input in the correct format
539 offset = self.sim_setup.start_time - inputs.index[0]
540 filepath = convert_tsd_to_modelica_txt(
541 tsd=inputs,
542 table_name=table_name,
543 save_path_file=file_name,
544 offset=offset
545 )
546 self.logger.info("Successfully created Dymola input file at %s", filepath)
548 if return_option == "savepath":
549 if unsupported_parameters:
550 raise KeyError("Dymola does not accept invalid parameter "
551 "names for option return_type='savepath'. "
552 "To use this option, delete unsupported "
553 "parameters from your setup.")
554 res = self.dymola.simulateExtendedModel(
555 self.model_name,
556 startTime=self.sim_setup.start_time,
557 stopTime=self.sim_setup.stop_time,
558 numberOfIntervals=0,
559 outputInterval=self.sim_setup.output_interval,
560 method=self.sim_setup.solver,
561 tolerance=self.sim_setup.tolerance,
562 fixedstepsize=self.sim_setup.fixedstepsize,
563 resultFile=result_file_name,
564 initialNames=initial_names,
565 initialValues=initial_values)
566 else:
567 if not parameters and not self.parameters:
568 raise ValueError(
569 "Sadly, simulating a model in Dymola "
570 "with no parameters returns no result. "
571 "Call this function using return_option='savepath' to get the results."
572 )
573 if not parameters:
574 random_name = list(self.parameters.keys())[0]
575 initial_values = [self.parameters[random_name].value]
576 initial_names = [random_name]
578 # Handle 1 and 2 D initial names:
579 # Convert a 1D list to 2D list
580 if initial_values and isinstance(initial_values[0], (float, int)):
581 initial_values = [initial_values]
583 # Handle the time of the simulation:
584 res_names = self.result_names.copy()
585 if "Time" not in res_names:
586 res_names.append("Time")
588 # Internally convert output Interval to number of intervals
589 # (Required by function simulateMultiResultsModel
590 number_of_intervals = (self.sim_setup.stop_time - self.sim_setup.start_time) / \
591 self.sim_setup.output_interval
592 if int(number_of_intervals) != number_of_intervals:
593 raise ValueError(
594 "Given output_interval and time interval did not yield "
595 "an integer numberOfIntervals. To use this functions "
596 "without savepaths, you have to provide either a "
597 "numberOfIntervals or a value for output_interval "
598 "which can be converted to numberOfIntervals.")
600 res = self.dymola.simulateMultiResultsModel(
601 self.model_name,
602 startTime=self.sim_setup.start_time,
603 stopTime=self.sim_setup.stop_time,
604 numberOfIntervals=int(number_of_intervals),
605 method=self.sim_setup.solver,
606 tolerance=self.sim_setup.tolerance,
607 fixedstepsize=self.sim_setup.fixedstepsize,
608 resultFile=None,
609 initialNames=initial_names,
610 initialValues=initial_values,
611 resultNames=res_names)
613 if not res[0]:
614 self.logger.error("Simulation failed!")
615 self.logger.error("The last error log from Dymola:")
616 log = self.dymola.getLastErrorLog()
617 # Only print first part as output is sometimes to verbose.
618 self.logger.error(log[:10000])
619 dslog_path = self.working_directory.joinpath('dslog.txt')
620 try:
621 with open(dslog_path, "r") as dslog_file:
622 dslog_content = dslog_file.read()
623 self.logger.error(dslog_content)
624 except Exception:
625 dslog_content = "Not retreivable. Open it yourself."
626 msg = f"Simulation failed: Reason according " \
627 f"to dslog, located at '{dslog_path}': {dslog_content}"
628 if fail_on_error:
629 raise Exception(msg)
630 # Don't raise and return None
631 self.logger.error(msg)
632 return None
634 if return_option == "savepath":
635 _save_name_dsres = f"{result_file_name}.mat"
636 # Get the working_directory of the current dymola instance
637 self.dymola.cd()
638 # Get the value and convert it to a 100 % fitting str-path
639 dymola_working_directory = str(Path(self.dymola.getLastErrorLog().replace("\n", "")))
640 mat_working_directory = os.path.join(dymola_working_directory, _save_name_dsres)
641 if savepath is None or str(savepath) == dymola_working_directory:
642 mat_result_file = mat_working_directory
643 else:
644 mat_save_path = os.path.join(savepath, _save_name_dsres)
645 os.makedirs(savepath, exist_ok=True)
646 # Copying dslogs and dsfinals can lead to errors,
647 # as the names are not unique
648 # for filename in [_save_name_dsres, "dslog.txt", "dsfinal.txt"]:
649 # Delete existing files
650 try:
651 os.remove(mat_save_path)
652 except OSError:
653 pass
654 # Move files
655 shutil.copy(mat_working_directory, mat_save_path)
656 os.remove(mat_working_directory)
657 mat_result_file = mat_save_path
658 result_file = postprocess_mat_result(mat_result_file, **kwargs_postprocessing)
659 return result_file
661 data = res[1] # Get data
662 if return_option == "last_point":
663 results = []
664 for ini_val_set in data:
665 results.append({result_name: ini_val_set[idx][-1] for idx, result_name
666 in enumerate(res_names)})
667 if len(results) == 1 and squeeze:
668 return results[0]
669 return results
670 # Else return as dataframe.
671 dfs = []
672 for ini_val_set in data:
673 df = pd.DataFrame({result_name: ini_val_set[idx] for idx, result_name
674 in enumerate(res_names)})
675 # Set time index
676 df = df.set_index("Time")
677 # Convert it to float
678 df.index = df.index.astype("float64")
679 dfs.append(df)
680 # Most of the cases, only one set is provided. In that case, avoid
681 if len(dfs) == 1 and squeeze:
682 return TimeSeriesData(dfs[0], default_tag="sim")
683 return [TimeSeriesData(df, default_tag="sim") for df in dfs]
685 def translate(self):
686 """
687 Translates the current model using dymola.translateModel()
688 and checks if erros occur.
689 """
690 res = self.dymola.translateModel(self.model_name)
691 if not res:
692 self.logger.error("Translation failed!")
693 self.logger.error("The last error log from Dymola:")
694 self.logger.error(self.dymola.getLastErrorLog())
695 raise Exception("Translation failed - Aborting")
697 def set_compiler(self, name, path, dll=False, dde=False, opc=False):
698 """
699 Set up the compiler and compiler options on Windows.
700 Optional: Specify if you want to enable dll, dde or opc.
702 :param str name:
703 Name of the compiler, avaiable options:
704 - 'vs': Visual Studio
705 - 'gcc': GCC
706 :param str,os.path.normpath path:
707 Path to the compiler files.
708 Example for name='vs': path='C:/Program Files (x86)/Microsoft Visual Studio 10.0/Vc'
709 Example for name='gcc': path='C:/MinGW/bin/gcc'
710 :param Boolean dll:
711 Set option for dll support. Check Dymolas Manual on what this exactly does.
712 :param Boolean dde:
713 Set option for dde support. Check Dymolas Manual on what this exactly does.
714 :param Boolean opc:
715 Set option for opc support. Check Dymolas Manual on what this exactly does.
716 :return: True, on success.
717 """
718 # Lookup dict for internal name of CCompiler-Variable
719 _name_int = {"vs": "MSVC",
720 "gcc": "GCC"}
722 if "win" not in sys.platform:
723 raise OSError(f"set_compiler function only implemented "
724 f"for windows systems, you are using {sys.platform}")
725 # Manually check correct input as Dymola's error are not a help
726 name = name.lower()
727 if name not in ["vs", "gcc"]:
728 raise ValueError(f"Given compiler name {name} not supported.")
729 if not os.path.exists(path):
730 raise FileNotFoundError(f"Given compiler path {path} does not exist on your machine.")
731 # Convert path for correct input
732 path = self._make_modelica_normpath(path)
733 if self.use_mp:
734 raise ValueError("Given function is not yet supported for multiprocessing")
736 res = self.dymola.SetDymolaCompiler(name.lower(),
737 [f"CCompiler={_name_int[name]}",
738 f"{_name_int[name]}DIR={path}",
739 f"DLL={int(dll)}",
740 f"DDE={int(dde)}",
741 f"OPC={int(opc)}"])
743 return res
745 def import_initial(self, filepath):
746 """
747 Load given dsfinal.txt into dymola
749 :param str,os.path.normpath filepath:
750 Path to the dsfinal.txt to be loaded
751 """
752 if not os.path.isfile(filepath):
753 raise FileNotFoundError(f"Given filepath {filepath} does not exist")
754 if not os.path.splitext(filepath)[1] == ".txt":
755 raise TypeError('File is not of type .txt')
756 if self.use_mp:
757 raise ValueError("Given function is not yet supported for multiprocessing")
758 res = self.dymola.importInitial(dsName=filepath)
759 if res:
760 self.logger.info("Successfully loaded dsfinal.txt")
761 else:
762 raise Exception("Could not load dsfinal into Dymola.")
764 @SimulationAPI.working_directory.setter
765 def working_directory(self, working_directory: Union[Path, str]):
766 """Set the working directory to the given path"""
767 if isinstance(working_directory, str):
768 working_directory = Path(working_directory)
769 self._working_directory = working_directory
770 if self.dymola is None: # Not yet started
771 return
772 # Also set the working_directory in the dymola api
773 self.set_dymola_cd(dymola=self.dymola,
774 cd=working_directory)
775 if self.use_mp:
776 self.logger.warning("Won't set the working_directory for all workers, "
777 "not yet implemented.")
779 @SimulationAPI.cd.setter
780 def cd(self, cd):
781 warnings.warn("cd was renamed to working_directory in all classes. Use working_directory instead.", category=DeprecationWarning)
782 self.working_directory = cd
784 def set_dymola_cd(self, dymola, cd):
785 """
786 Set the cd of the Dymola Instance.
787 Before calling the Function, create the path and
788 convert to a modelica-normpath.
789 """
790 os.makedirs(cd, exist_ok=True)
791 cd_modelica = self._make_modelica_normpath(path=cd)
792 res = dymola.cd(cd_modelica)
793 if not res:
794 raise OSError(f"Could not change working directory to {cd}")
796 def close(self):
797 """Closes dymola."""
798 # Close MP of super class
799 super().close()
800 # Always close main instance
801 self._single_close(dymola=self.dymola)
803 def _close_multiprocessing(self, _):
804 self._single_close()
805 DymolaAPI.dymola = None
807 def _single_close(self, **kwargs):
808 """Closes a single dymola instance"""
809 if self.dymola is None:
810 return # Already closed prior
811 # Execute the mos-script if given:
812 if self.mos_script_post is not None:
813 self.logger.info("Executing given mos_script_post "
814 "prior to closing.")
815 self.dymola.RunScript(self.mos_script_post)
816 self.logger.info("Output of mos_script_post: %s", self.dymola.getLastErrorLog())
817 self.logger.info('Closing Dymola')
818 self.dymola.close()
819 self.logger.info('Successfully closed Dymola')
820 self.dymola = None
822 def _close_dummy(self):
823 """
824 Closes dummy instance at the end of the execution
825 """
826 if self._dummy_dymola_instance is not None:
827 self.logger.info('Closing dummy Dymola instance')
828 self._dummy_dymola_instance.close()
829 self.logger.info('Successfully closed dummy Dymola instance')
831 def extract_model_variables(self):
832 """
833 Extract all variables of the model by
834 translating it and then processing the dsin
835 using the manipulate_ds module.
836 """
837 # Translate model
838 self.logger.info("Translating model '%s' to extract model variables ",
839 self.model_name)
840 self.translate()
841 # Get path to dsin:
842 dsin_path = os.path.join(self.cd, "dsin.txt")
843 df = manipulate_ds.convert_ds_file_to_dataframe(dsin_path)
844 # Convert and return all parameters of dsin to initial values and names
845 for idx, row in df.iterrows():
846 _max = float(row["4"])
847 _min = float(row["3"])
848 if _min >= _max:
849 _var_ebcpy = Variable(value=float(row["2"]))
850 else:
851 _var_ebcpy = Variable(
852 min=_min,
853 max=_max,
854 value=float(row["2"])
855 )
856 if row["5"] == "1":
857 self.parameters[idx] = _var_ebcpy
858 elif row["5"] == "5":
859 self.inputs[idx] = _var_ebcpy
860 elif row["5"] == "4":
861 self.outputs[idx] = _var_ebcpy
862 else:
863 self.states[idx] = _var_ebcpy
865 def _setup_dymola_interface(self, kwargs: dict):
866 """Load all packages and change the current working directory"""
867 use_mp = kwargs["use_mp"]
868 port = kwargs.get("port", -1)
869 time_delay = kwargs.get("time_delay", 0)
870 time.sleep(time_delay)
871 dymola = self._open_dymola_interface(port=port)
872 self._check_dymola_instances()
873 if use_mp:
874 cd = os.path.join(self.cd, f"worker_{self.worker_idx}")
875 else:
876 cd = self.cd
877 # Execute the mos-script if given:
878 if self.mos_script_pre is not None:
879 self.logger.info("Executing given mos_script_pre "
880 "prior to loading packages.")
881 dymola.RunScript(self.mos_script_pre)
882 self.logger.info("Output of mos_script_pre: %s", dymola.getLastErrorLog())
884 # Set the cd in the dymola api
885 self.set_dymola_cd(dymola=dymola, cd=cd)
887 for package in self.packages:
888 self.logger.info("Loading Model %s", os.path.dirname(package).split("\\")[-1])
889 res = dymola.openModel(package, changeDirectory=False)
890 if not res:
891 raise ImportError(dymola.getLastErrorLog())
892 self.logger.info("Loaded modules")
894 dymola.experimentSetupOutput(**self.experiment_setup_output.dict())
895 if use_mp:
896 DymolaAPI.dymola = dymola
897 return None
898 return dymola
900 def update_experiment_setup_output(self, experiment_setup_output: Union[ExperimentSetupOutput, dict]):
901 """
902 Function to update the ExperimentSetupOutput in Dymola for selection
903 of which variables are going to be saved. The options
904 `events` and `equidistant` are overridden if equidistant output is required.
906 :param (ExperimentSetupOutput, dict) experiment_setup_output:
907 An instance of ExperimentSetupOutput or a dict with valid keys for it.
908 """
909 if isinstance(experiment_setup_output, dict):
910 self.experiment_setup_output = ExperimentSetupOutput(**experiment_setup_output)
911 else:
912 self.experiment_setup_output = experiment_setup_output
913 if self.equidistant_output:
914 # Change the Simulation Output, to ensure all
915 # simulation results have the same array shape.
916 # Events can also cause errors in the shape.
917 self.experiment_setup_output.equidistant = True
918 self.experiment_setup_output.events = False
919 if self.dymola is None:
920 return
921 self.dymola.experimentSetupOutput(**self.experiment_setup_output.model_dump())
923 def license_is_available(self, option: str = "Standard"):
924 """Check if license is available"""
925 if self.dymola is None:
926 warnings.warn("You want to check the license before starting dymola, this is not supported.")
927 return False
928 return self.dymola.RequestOption(option)
930 def _open_dymola_interface(self, port):
931 """Open an instance of dymola and return the API-Object"""
932 if self.dymola_interface_path not in sys.path:
933 sys.path.insert(0, self.dymola_interface_path)
934 try:
935 from dymola.dymola_interface import DymolaInterface
936 from dymola.dymola_exception import DymolaConnectionException
937 return DymolaInterface(showwindow=self.show_window,
938 dymolapath=self.dymola_exe_path,
939 port=port)
940 except ImportError as error:
941 raise ImportError("Given dymola-interface could not be "
942 "loaded:\n %s" % self.dymola_interface_path) from error
943 except DymolaConnectionException as error:
944 raise ConnectionError(error) from error
946 def to_dict(self):
947 """
948 Store the most relevant information of this class
949 into a dictionary. This may be used for future configuration.
951 :return: dict config:
952 Dictionary with keys to re-init this class.
953 """
954 # Convert Path to str to enable json-dumping
955 config = {"cd": str(self.cd),
956 "packages": [str(pack) for pack in self.packages],
957 "model_name": self.model_name,
958 "type": "DymolaAPI",
959 }
960 # Update kwargs
961 config.update({kwarg: self.__dict__.get(kwarg, None)
962 for kwarg in self._supported_kwargs})
964 return config
966 def get_packages(self):
967 """
968 Get the currently loaded packages of Dymola
969 """
970 packages = self.dymola.ExecuteCommand(
971 'ModelManagement.Structure.AST.Misc.ClassesInPackage("")'
972 )
973 if packages is None:
974 self.logger.error("Could not load packages from Dymola, using self.packages")
975 packages = []
976 for pack in self.packages:
977 pack = Path(pack)
978 if pack.name == "package.mo":
979 packages.append(pack.parent.name)
980 valid_packages = []
981 for pack in packages:
982 current_package = f"modelica://{pack}/package.order"
983 pack_path = self.dymola.ExecuteCommand(
984 f'Modelica.Utilities.Files.loadResource("{current_package}")'
985 )
986 if not isinstance(pack_path, str):
987 self.logger.error("Could not load model resource for package %s", pack)
988 if os.path.isfile(pack_path):
989 valid_packages.append(Path(pack_path).parent)
990 return valid_packages
992 def save_for_reproduction(
993 self,
994 title: str,
995 path: Path = None,
996 files: list = None,
997 save_total_model: bool = True,
998 export_fmu: bool = True,
999 **kwargs
1000 ):
1001 """
1002 Additionally to the basic reproduction, add info
1003 for Dymola packages.
1005 Content which is saved:
1006 - DymolaAPI configuration
1007 - Information on Dymola: Version, flags
1008 - All loaded packages
1009 - Total model, if save_total_model = True
1010 - FMU, if export_fmu = True
1012 :param bool save_total_model:
1013 True to save the total model
1014 :param bool export_fmu:
1015 True to export the FMU of the current model.
1016 """
1017 # Local import to require git-package only when called
1018 from ebcpy.utils.reproduction import ReproductionFile, CopyFile, get_git_information
1020 if files is None:
1021 files = []
1022 # DymolaAPI Info:
1023 files.append(ReproductionFile(
1024 filename="Dymola/DymolaAPI_config.json",
1025 content=json.dumps(self.to_dict(), indent=2)
1026 ))
1027 # Dymola info:
1028 self.dymola.ExecuteCommand("list();")
1029 _flags = self.dymola.getLastErrorLog()
1030 dymola_info = [
1031 self.dymola.ExecuteCommand("DymolaVersion()"),
1032 str(self.dymola.ExecuteCommand("DymolaVersionNumber()")),
1033 "\n\n"
1034 ]
1035 files.append(ReproductionFile(
1036 filename="Dymola/DymolaInfo.txt",
1037 content="\n".join(dymola_info) + _flags
1038 ))
1040 # Packages
1041 packages = self.get_packages()
1042 package_infos = []
1043 for pack_path in packages:
1045 for pack_dir_parent in [pack_path] + list(pack_path.parents):
1046 repo_info = get_git_information(
1047 path=pack_dir_parent,
1048 zip_folder_path="Dymola"
1049 )
1050 if not repo_info:
1051 continue
1053 files.extend(repo_info.pop("difference_files"))
1054 pack_path = str(pack_path) + "; " + "; ".join([f"{key}: {value}" for key, value in repo_info.items()])
1055 break
1056 package_infos.append(str(pack_path))
1057 files.append(ReproductionFile(
1058 filename="Dymola/Modelica_packages.txt",
1059 content="\n".join(package_infos)
1060 ))
1061 # Total model
1062 if save_total_model:
1063 _total_model_name = f"Dymola/{self.model_name.replace('.', '_')}_total.mo"
1064 _total_model = Path(self.cd).joinpath(_total_model_name)
1065 os.makedirs(_total_model.parent, exist_ok=True) # Create to ensure model can be saved.
1066 if "(" in self.model_name:
1067 # Create temporary model:
1068 temp_model_file = Path(self.cd).joinpath(f"temp_total_model_{uuid.uuid4()}.mo")
1069 temp_mode_name = f"{self.model_name.split('(')[0].split('.')[-1]}WithModifier"
1070 with open(temp_model_file, "w") as file:
1071 file.write(f"model {temp_mode_name}\n extends {self.model_name};\nend {temp_mode_name};")
1072 res = self.dymola.openModel(str(temp_model_file), changeDirectory=False)
1073 if not res:
1074 self.logger.error(
1075 "Could not create separate model for model with modifiers: %s",
1076 self.model_name
1077 )
1078 model_name_to_save = self.model_name
1079 else:
1080 model_name_to_save = temp_mode_name
1081 os.remove(temp_model_file)
1082 else:
1083 model_name_to_save = self.model_name
1084 res = self.dymola.saveTotalModel(
1085 fileName=str(_total_model),
1086 modelName=model_name_to_save
1087 )
1088 if res:
1089 files.append(ReproductionFile(
1090 filename=_total_model_name,
1091 content=_total_model.read_text()
1092 ))
1093 os.remove(_total_model)
1094 else:
1095 self.logger.error("Could not save total model: %s",
1096 self.dymola.getLastErrorLog())
1097 # FMU
1098 if export_fmu:
1099 _fmu_path = self._save_to_fmu(fail_on_error=False)
1100 if _fmu_path is not None:
1101 files.append(CopyFile(
1102 sourcepath=_fmu_path,
1103 filename="Dymola/" + _fmu_path.name,
1104 remove=True
1105 ))
1107 return super().save_for_reproduction(
1108 title=title,
1109 path=path,
1110 files=files,
1111 **kwargs
1112 )
1114 def _save_to_fmu(self, fail_on_error):
1115 """Save model as an FMU"""
1116 res = self.dymola.translateModelFMU(
1117 modelToOpen=self.model_name,
1118 storeResult=False,
1119 modelName='',
1120 fmiVersion='2',
1121 fmiType='all',
1122 includeSource=False,
1123 includeImage=0
1124 )
1125 if not res:
1126 msg = "Could not export fmu: %s" % self.dymola.getLastErrorLog()
1127 self.logger.error(msg)
1128 if fail_on_error:
1129 raise Exception(msg)
1130 else:
1131 path = Path(self.cd).joinpath(res + ".fmu")
1132 return path
1134 @staticmethod
1135 def _make_modelica_normpath(path):
1136 """
1137 Convert given path to a path readable in dymola.
1138 If the base path does not exist, create it.
1140 :param str,os.path.normpath path:
1141 Either a file or a folder path. The base to this
1142 path is created in non existent.
1143 :return: str
1144 Path readable in dymola
1145 """
1146 if isinstance(path, Path):
1147 path = str(path)
1149 path = path.replace("\\", "/")
1150 # Search for e.g. "D:testzone" and replace it with D:/testzone
1151 loc = path.find(":")
1152 if path[loc + 1] != "/" and loc != -1:
1153 path = path.replace(":", ":/")
1154 return path
1156 @staticmethod
1157 def get_dymola_interface_path(dymola_install_dir):
1158 """
1159 Function to get the path of the newest dymola interface
1160 installment on the used machine
1162 :param str dymola_install_dir:
1163 The dymola installation folder. Example:
1164 "C://Program Files//Dymola 2020"
1165 :return: str
1166 Path to the dymola.egg-file or .whl file (for 2024 refresh 1 or newer versions)
1167 """
1168 path_to_interface = os.path.join(dymola_install_dir, "Modelica", "Library", "python_interface")
1169 path_to_egg_file = os.path.join(path_to_interface, "dymola.egg")
1170 if os.path.isfile(path_to_egg_file):
1171 return path_to_egg_file
1172 # Try to find .whl file:
1173 for file in os.listdir(path_to_interface):
1174 if file.endswith(".whl"):
1175 return os.path.join(path_to_interface, file)
1176 # If still here, no .egg or .whl was found
1177 raise FileNotFoundError(f"The given dymola installation directory "
1178 f"'{dymola_install_dir}' has no "
1179 f"dymola-interface .egg or .whl-file.")
1181 @staticmethod
1182 def get_dymola_exe_path(dymola_install_dir, dymola_name=None):
1183 """
1184 Function to get the path of the dymola exe-file
1185 on the current used machine.
1187 :param str dymola_install_dir:
1188 The dymola installation folder. Example:
1189 "C://Program Files//Dymola 2020"
1190 :param str dymola_name:
1191 Name of the executable. On Windows it is always Dymola.exe, on
1192 linux just dymola.
1193 :return: str
1194 Path to the dymola-exe-file.
1195 """
1196 if dymola_name is None:
1197 if "linux" in sys.platform:
1198 dymola_name = "dymola"
1199 elif "win" in sys.platform:
1200 dymola_name = "Dymola.exe"
1201 else:
1202 raise OSError(f"Your operating system {sys.platform} has no default dymola-name."
1203 f"Please provide one.")
1205 bin_64 = os.path.join(dymola_install_dir, "bin64", dymola_name)
1206 bin_32 = os.path.join(dymola_install_dir, "bin", dymola_name)
1207 if os.path.isfile(bin_64): # First check for 64bit installation
1208 dym_file = bin_64
1209 elif os.path.isfile(bin_32): # Else use the 32bit version
1210 dym_file = bin_32
1211 else:
1212 raise FileNotFoundError(
1213 f"The given dymola installation has not executable at '{bin_32}'. "
1214 f"If your dymola_path exists, please raise an issue."
1215 )
1217 return dym_file
1219 @staticmethod
1220 def get_dymola_install_paths(basedir=None):
1221 """
1222 Function to get all paths of dymola installations
1223 on the used machine. Supported platforms are:
1224 * Windows
1225 * Linux
1226 * Mac OS X
1227 If multiple installation of Dymola are found, the newest version will be returned.
1228 This assumes the names are sortable, e.g. Dymola 2020, Dymola 2019 etc.
1230 :param str basedir:
1231 The base-directory to search for the dymola-installation.
1232 The default value depends on the platform one is using.
1233 On Windows it is "C://Program Files" or "C://Program Files (x86)" (for 64 bit)
1234 On Linux it is "/opt" (based on our ci-Docker configuration
1235 On Mac OS X "/Application" (based on the default)
1236 :return: str
1237 Path to the dymola-installation
1238 """
1240 if basedir is None:
1241 if "linux" in sys.platform:
1242 basedir = os.path.normpath("/opt")
1243 elif "win" in sys.platform:
1244 basedir = os.path.normpath("C:/Program Files")
1245 elif "darwin" in sys.platform:
1246 basedir = os.path.normpath("/Applications")
1247 else:
1248 raise OSError(f"Your operating system ({sys.platform})does not support "
1249 f"a default basedir. Please provide one.")
1251 syspaths = [basedir]
1252 # Check if 64bit is installed (Windows only)
1253 systempath_64 = os.path.normpath("C://Program Files (x86)")
1254 if os.path.exists(systempath_64):
1255 syspaths.append(systempath_64)
1256 # Get all folders in both path's
1257 temp_list = []
1258 for systempath in syspaths:
1259 temp_list += os.listdir(systempath)
1260 # Filter programs that are not Dymola
1261 dym_versions = []
1262 for folder_name in temp_list:
1263 # Catch both Dymola and dymola folder-names
1264 if "dymola" in folder_name.lower():
1265 dym_versions.append(folder_name)
1266 del temp_list
1267 # Find the newest version and return the egg-file
1268 # This sorting only works with a good Folder structure, eg. Dymola 2020, Dymola 2019 etc.
1269 dym_versions.sort()
1270 valid_paths = []
1271 for dym_version in reversed(dym_versions):
1272 for system_path in syspaths:
1273 full_path = os.path.join(system_path, dym_version)
1274 if os.path.isdir(full_path):
1275 valid_paths.append(full_path)
1276 return valid_paths
1278 def _check_dymola_instances(self):
1279 """
1280 Check how many dymola instances are running on the machine.
1281 Raise a warning if the number exceeds a certain amount.
1282 """
1283 # The option may be useful. However the explicit requirement leads to
1284 # Problems on linux, therefore the feature is not worth the trouble.
1285 # pylint: disable=import-outside-toplevel
1286 try:
1287 import psutil
1288 except ImportError:
1289 return
1290 counter = 0
1291 for proc in psutil.process_iter():
1292 try:
1293 if "Dymola" in proc.name():
1294 counter += 1
1295 except (psutil.AccessDenied, psutil.NoSuchProcess):
1296 continue
1297 if counter >= self._critical_number_instances:
1298 warnings.warn("There are currently %s Dymola-Instances "
1299 "running on your machine!" % counter)
1301 @staticmethod
1302 def _alter_model_name(parameters, model_name, structural_params):
1303 """
1304 Creates a modifier for all structural parameters,
1305 based on the modelname and the initalNames and values.
1307 :param dict parameters:
1308 Parameters of the simulation
1309 :param str model_name:
1310 Name of the model to be modified
1311 :param list structural_params:
1312 List of strings with structural parameters
1313 :return: str altered_modelName:
1314 modified model name
1315 """
1316 # the structural parameter needs to be removed from paramters dict
1317 new_parameters = parameters.copy()
1318 model_name = model_name.split("(")[0] # Trim old modifier
1319 if parameters == {}:
1320 return model_name
1321 all_modifiers = []
1322 for var_name, value in parameters.items():
1323 # Check if the variable is in the
1324 # given list of structural parameters
1325 if var_name in structural_params:
1326 all_modifiers.append(f"{var_name}={value}")
1327 # removal of the structural parameter
1328 new_parameters.pop(var_name)
1329 altered_model_name = f"{model_name}({','.join(all_modifiers)})"
1330 return altered_model_name, new_parameters
1332 def _check_restart(self):
1333 """Restart Dymola every n_restart iterations in order to free memory"""
1335 if self.sim_counter == self.n_restart:
1336 self.logger.info("Closing and restarting Dymola to free memory")
1337 self.close()
1338 self._dummy_dymola_instance = self._setup_dymola_interface(dict(use_mp=False))
1339 self.sim_counter = 1
1340 else:
1341 self.sim_counter += 1
1344def _get_dymola_path_of_version(dymola_installations: list, dymola_version: str):
1345 """
1346 Helper function to get the path associated to the dymola_version
1347 from the list of all installations
1348 """
1349 for dymola_path in dymola_installations:
1350 if dymola_path.endswith(dymola_version):
1351 return dymola_path
1352 # If still here, version was not found
1353 raise ValueError(
1354 f"Given dymola_version '{dymola_version}' not found in "
1355 f"the list of dymola installations {dymola_installations}"
1356 )
1359def _get_n_available_ports(n_ports: int, start_range: int = 44000, end_range: int = 44400):
1360 """
1361 Get a specified number of available network ports within a given range.
1363 This function uses socket connections to check the availability of ports within the specified range.
1364 If the required number of open ports is found, it returns a list of those ports. If not, it raises
1365 a ConnectionError with a descriptive message indicating the failure to find the necessary ports.
1367 Parameters:
1368 - n_ports (int): The number of open ports to find.
1369 - start_range (int, optional):
1370 The starting port of the range to check (inclusive).
1371 Default is 44000.
1372 - end_range (int, optional):
1373 The ending port of the range to check (exclusive).
1374 Default is 44400.
1376 Returns:
1377 - list of int:
1378 A list containing the available ports.
1379 The length of the list is equal to 'n_ports'.
1381 Raises:
1382 - ConnectionError:
1383 If the required number of open ports cannot
1384 be found within the specified range.
1386 Example:
1388 ```
1389 try:
1390 open_ports = _get_n_available_ports(3, start_range=50000, end_range=50500)
1391 print(f"Found open ports: {open_ports}")
1392 except ConnectionError as e:
1393 print(f"Error: {e}")
1394 ```
1395 """
1396 ports = []
1397 for port in range(start_range, end_range):
1398 try:
1399 with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:
1400 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
1401 sock.bind(("127.0.0.1", port))
1402 ports.append(port)
1403 except OSError:
1404 pass
1405 if len(ports) == n_ports:
1406 return ports
1407 raise ConnectionError(
1408 f"Could not find {n_ports} open ports in range {start_range}-{end_range}."
1409 f"Can't open {n_ports} Dymola instances"
1410 )