Coverage for ebcpy/utils/conversion.py: 100%
68 statements
« prev ^ index » next coverage.py v7.4.4, created at 2024-09-19 12:21 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2024-09-19 12:21 +0000
1"""
2Module with functions to convert
3certain format into other formats.
4"""
5import pathlib
6import scipy.io as spio
7import numpy as np
8import pandas as pd
10from ebcpy.data_types import index_is_numeric, datetime_indexes
13def convert_tsd_to_modelica_mat(tsd, save_path_file, **kwargs):
14 """
15 Function to convert a tsd to a mat-file readable within Dymola.
17 :param TimeSeriesData tsd:
18 TimeSeriesData object
19 :param str,os.path.normpath save_path_file:
20 File path and name where to store the output .mat file.
21 :keyword list columns:
22 A list with names of columns that should be saved to .mat file.
23 If no list is provided, all columns are converted.
24 :keyword float offset:
25 Offset for time in seconds, default 0
26 :returns mat_file:
27 Returns the version 4 mat-file
29 :return:
30 str,os.path.normpath:
31 Path where the data is saved.
32 Equal to save_path_file
34 Examples:
36 >>> import os
37 >>> from ebcpy import TimeSeriesData
38 >>> project_dir = os.path.dirname(os.path.dirname(__file__))
39 >>> example_file = os.path.normpath(project_dir + "//tests//data//example_data.csv")
40 >>> save_path = os.path.normpath(project_dir + "//tests//data//example_data_converted.mat")
41 >>> cols = ["sine.freqHz / Hz"]
42 >>> tsd = TimeSeriesData(example_file, sep=";")
43 >>> filepath = convert_tsd_to_modelica_mat(tsd,
44 >>> save_path, columns=cols)
45 >>> os.remove(filepath)
46 """
47 if isinstance(save_path_file, pathlib.Path):
48 save_path_file = str(save_path_file)
50 if not save_path_file.endswith(".mat"):
51 raise ValueError("Given savepath for txt-file is not a .mat file!")
53 # Load the relevant part of the df
54 df_sub, _ = _convert_to_subset(
55 df=tsd,
56 columns=kwargs.get("columns", None),
57 offset=kwargs.get("offset", 0)
58 )
60 # Convert np.array into a list and create a dict with 'table' as matrix name
61 new_mat = {'table': df_sub.values.tolist()}
62 # Save matrix as a MATLAB *.mat file, which is readable by Modelica.
63 spio.savemat(save_path_file, new_mat, format="4")
64 # Provide user feedback whether the conversion was successful.
65 return save_path_file
68def convert_tsd_to_clustering_txt(tsd, save_path_file, columns=None):
69 """
70 Function to convert a TimeSeriesData object
71 to a txt-file readable within the TICC-module.
73 :param TimeSeriesData tsd:
74 TimeSeriesData object
75 :param str,os.path.normpath save_path_file:
76 File path and name where to store the output .mat file.
77 :param list columns:
78 A list with names of columns that should be saved to .mat file.
79 If no list is provided, all columns are converted.
80 :returns True on Success, savepath of txt-file:
81 Returns the version 4 mat-file
83 :return:
84 str,os.path.normpath:
85 Path where the data is saved.
86 Equal to save_path_file
88 Examples:
90 >>> import os
91 >>> project_dir = os.path.dirname(os.path.dirname(__file__))
92 >>> example_file = os.path.normpath(project_dir + "//tests//data//example_data.csv")
93 >>> save_path = os.path.normpath(project_dir + "//tests//data//example_data_converted.txt")
94 >>> cols = ["sine.freqHz / Hz"]
95 >>> tsd = TimeSeriesData(example_file, sep=";")
96 >>> filepath = convert_tsd_to_clustering_txt(tsd,
97 >>> save_path, columns=cols)
98 >>> os.remove(filepath)
99 """
100 # Get the subset of the dataFrame
101 df_sub, _ = _convert_to_subset(df=tsd, columns=columns, offset=0)
103 # Convert np.array into a list and create a list as matrix name
104 df_sub.values.tolist()
105 # Save matrix as a *.txt file, which is readable by TICC.
106 np.savetxt(save_path_file, df_sub, delimiter=',', fmt='%.4f')
107 # Provide user feedback whether the conversion was successful.
108 return save_path_file
111def convert_tsd_to_modelica_txt(tsd, table_name, save_path_file, **kwargs):
112 """
113 Convert a TimeSeriesData object to modelica readable text. This is especially useful
114 for generating input data for a modelica simulation.
116 :param TimeSeriesData tsd:
117 TimeSeriesData object
118 :param str table_name:
119 Name of the table for modelica.
120 Needed in Modelica to correctly load the file.
121 :param str,os.path.normpath save_path_file:
122 File path and name where to store the output .txt file.
123 :keyword list columns:
124 A list with names of columns that should be saved to .mat file.
125 If no list is provided, all columns are converted.
126 :keyword float offset:
127 Offset for time in seconds, default 0
128 :keyword str sep:
129 Separator used to separate values between columns
130 :keyword Boolean with_tag:
131 Use True each variable and tag is written to the file
132 If False, only the variable name is written to the file.
134 :return:
135 str,os.path.normpath:
136 Path where the data is saved.
137 Equal to save_path_file
139 Examples:
141 >>> import os
142 >>> from ebcpy import TimeSeriesData
143 >>> project_dir = os.path.dirname(os.path.dirname(__file__))
144 >>> example_file = os.path.normpath(project_dir + "//tests//data//example_data.csv")
145 >>> save_path = os.path.normpath(project_dir + "//tests//data//example_data_converted.txt")
146 >>> cols = ["sine.freqHz / Hz"]
147 >>> tsd = TimeSeriesData(example_file, sep=";")
148 >>> filepath = convert_tsd_to_modelica_txt(tsd, "dummy_input_data", save_path, columns=cols)
149 >>> os.remove(filepath)
150 """
151 if isinstance(save_path_file, pathlib.Path):
152 save_path_file = str(save_path_file)
153 if not save_path_file.endswith(".txt"):
154 raise ValueError("Given savepath for txt-file is not a .txt file!")
156 # Load the relavant part of the df
157 df_sub, headers = _convert_to_subset(
158 df=tsd,
159 columns=kwargs.get("columns", None),
160 offset=kwargs.get("offset", 0)
161 )
163 # Unpack kwargs
164 sep = kwargs.get("sep", "\t")
166 n_cols = len(headers)
167 n_rows = len(df_sub.index)
168 # Comment header line
169 _temp_str = ""
171 if kwargs.get("with_tag", True):
172 # Convert ("variable", "tag") to "variable_tag"
173 _temp_str = sep.join(["_".join(variable_tag) for variable_tag in headers])
174 else:
175 for idx, var in enumerate(headers):
176 if idx == 0:
177 # Convert time with tag to one string as unit is important
178 _temp_str += "_".join(var)
179 else:
180 # Convert ("variable", "tag") to "variable"
181 _temp_str += sep + var[0]
182 content_as_lines = [f"#{_temp_str}\n"]
183 content_as_lines.insert(0, f"double {table_name}({n_rows}, {n_cols})\n")
184 content_as_lines.insert(0, "#1\n") # Print Modelica table no
186 # Open file and write the header
187 with open(file=save_path_file, mode="a+", encoding="utf-8") as file:
188 file.seek(0)
189 file.truncate() # Delete possible old content
190 file.writelines(content_as_lines)
192 # Append the data directly using to_csv from pandas
193 df_sub.to_csv(save_path_file, header=None, index=None, sep=sep, mode="a")
195 return save_path_file
198def _convert_to_subset(df, columns, offset):
199 """
200 Private function to ensure lean conversion to either mat or txt.
201 """
202 df = df.copy()
203 if columns:
204 if isinstance(columns, str):
205 columns = [columns] # Must be a list
206 headers = df[columns].columns.values.tolist()
207 else:
208 headers = df.columns.values.tolist()
210 _time_header = ('time', 'in_s')
211 headers.insert(0, _time_header) # Ensure time will be at first place
213 if isinstance(df.index, tuple(datetime_indexes)):
214 df.index = df.index - df.iloc[0].name.to_datetime64() # Make index zero based
215 df[_time_header] = df.index.total_seconds() + offset
216 elif index_is_numeric(df.index):
217 df[_time_header] = df.index - df.iloc[0].name + offset
218 else:
219 # Should not happen as error is raised in data_types. But just to be sure:
220 raise IndexError(f"Given index of type {type(df.index)} is not supported.")
221 # Avoid 1e-8 errors in timedelta calculation.
222 df[_time_header] = df[_time_header].round(4)
224 # Check if nan values occur
225 if df.loc[:, headers].isnull().values.sum() > 0:
226 raise ValueError("Selected columns contain NaN values. This would lead to errors"
227 "in the simulation environment.")
229 # Convert cases with no tag to tuple
230 def _to_tuple(s):
231 if isinstance(s, tuple):
232 return s
233 return (s, )
234 headers_as_tuple = [_to_tuple(header) for header in headers]
236 return df.loc[:, headers], headers_as_tuple