Coverage for ebcpy/utils/conversion.py: 100%

68 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-09-19 12:21 +0000

1""" 

2Module with functions to convert 

3certain format into other formats. 

4""" 

5import pathlib 

6import scipy.io as spio 

7import numpy as np 

8import pandas as pd 

9 

10from ebcpy.data_types import index_is_numeric, datetime_indexes 

11 

12 

13def convert_tsd_to_modelica_mat(tsd, save_path_file, **kwargs): 

14 """ 

15 Function to convert a tsd to a mat-file readable within Dymola. 

16 

17 :param TimeSeriesData tsd: 

18 TimeSeriesData object 

19 :param str,os.path.normpath save_path_file: 

20 File path and name where to store the output .mat file. 

21 :keyword list columns: 

22 A list with names of columns that should be saved to .mat file. 

23 If no list is provided, all columns are converted. 

24 :keyword float offset: 

25 Offset for time in seconds, default 0 

26 :returns mat_file: 

27 Returns the version 4 mat-file 

28 

29 :return: 

30 str,os.path.normpath: 

31 Path where the data is saved. 

32 Equal to save_path_file 

33 

34 Examples: 

35 

36 >>> import os 

37 >>> from ebcpy import TimeSeriesData 

38 >>> project_dir = os.path.dirname(os.path.dirname(__file__)) 

39 >>> example_file = os.path.normpath(project_dir + "//tests//data//example_data.csv") 

40 >>> save_path = os.path.normpath(project_dir + "//tests//data//example_data_converted.mat") 

41 >>> cols = ["sine.freqHz / Hz"] 

42 >>> tsd = TimeSeriesData(example_file, sep=";") 

43 >>> filepath = convert_tsd_to_modelica_mat(tsd, 

44 >>> save_path, columns=cols) 

45 >>> os.remove(filepath) 

46 """ 

47 if isinstance(save_path_file, pathlib.Path): 

48 save_path_file = str(save_path_file) 

49 

50 if not save_path_file.endswith(".mat"): 

51 raise ValueError("Given savepath for txt-file is not a .mat file!") 

52 

53 # Load the relevant part of the df 

54 df_sub, _ = _convert_to_subset( 

55 df=tsd, 

56 columns=kwargs.get("columns", None), 

57 offset=kwargs.get("offset", 0) 

58 ) 

59 

60 # Convert np.array into a list and create a dict with 'table' as matrix name 

61 new_mat = {'table': df_sub.values.tolist()} 

62 # Save matrix as a MATLAB *.mat file, which is readable by Modelica. 

63 spio.savemat(save_path_file, new_mat, format="4") 

64 # Provide user feedback whether the conversion was successful. 

65 return save_path_file 

66 

67 

68def convert_tsd_to_clustering_txt(tsd, save_path_file, columns=None): 

69 """ 

70 Function to convert a TimeSeriesData object 

71 to a txt-file readable within the TICC-module. 

72 

73 :param TimeSeriesData tsd: 

74 TimeSeriesData object 

75 :param str,os.path.normpath save_path_file: 

76 File path and name where to store the output .mat file. 

77 :param list columns: 

78 A list with names of columns that should be saved to .mat file. 

79 If no list is provided, all columns are converted. 

80 :returns True on Success, savepath of txt-file: 

81 Returns the version 4 mat-file 

82 

83 :return: 

84 str,os.path.normpath: 

85 Path where the data is saved. 

86 Equal to save_path_file 

87 

88 Examples: 

89 

90 >>> import os 

91 >>> project_dir = os.path.dirname(os.path.dirname(__file__)) 

92 >>> example_file = os.path.normpath(project_dir + "//tests//data//example_data.csv") 

93 >>> save_path = os.path.normpath(project_dir + "//tests//data//example_data_converted.txt") 

94 >>> cols = ["sine.freqHz / Hz"] 

95 >>> tsd = TimeSeriesData(example_file, sep=";") 

96 >>> filepath = convert_tsd_to_clustering_txt(tsd, 

97 >>> save_path, columns=cols) 

98 >>> os.remove(filepath) 

99 """ 

100 # Get the subset of the dataFrame 

101 df_sub, _ = _convert_to_subset(df=tsd, columns=columns, offset=0) 

102 

103 # Convert np.array into a list and create a list as matrix name 

104 df_sub.values.tolist() 

105 # Save matrix as a *.txt file, which is readable by TICC. 

106 np.savetxt(save_path_file, df_sub, delimiter=',', fmt='%.4f') 

107 # Provide user feedback whether the conversion was successful. 

108 return save_path_file 

109 

110 

111def convert_tsd_to_modelica_txt(tsd, table_name, save_path_file, **kwargs): 

112 """ 

113 Convert a TimeSeriesData object to modelica readable text. This is especially useful 

114 for generating input data for a modelica simulation. 

115 

116 :param TimeSeriesData tsd: 

117 TimeSeriesData object 

118 :param str table_name: 

119 Name of the table for modelica. 

120 Needed in Modelica to correctly load the file. 

121 :param str,os.path.normpath save_path_file: 

122 File path and name where to store the output .txt file. 

123 :keyword list columns: 

124 A list with names of columns that should be saved to .mat file. 

125 If no list is provided, all columns are converted. 

126 :keyword float offset: 

127 Offset for time in seconds, default 0 

128 :keyword str sep: 

129 Separator used to separate values between columns 

130 :keyword Boolean with_tag: 

131 Use True each variable and tag is written to the file 

132 If False, only the variable name is written to the file. 

133 

134 :return: 

135 str,os.path.normpath: 

136 Path where the data is saved. 

137 Equal to save_path_file 

138 

139 Examples: 

140 

141 >>> import os 

142 >>> from ebcpy import TimeSeriesData 

143 >>> project_dir = os.path.dirname(os.path.dirname(__file__)) 

144 >>> example_file = os.path.normpath(project_dir + "//tests//data//example_data.csv") 

145 >>> save_path = os.path.normpath(project_dir + "//tests//data//example_data_converted.txt") 

146 >>> cols = ["sine.freqHz / Hz"] 

147 >>> tsd = TimeSeriesData(example_file, sep=";") 

148 >>> filepath = convert_tsd_to_modelica_txt(tsd, "dummy_input_data", save_path, columns=cols) 

149 >>> os.remove(filepath) 

150 """ 

151 if isinstance(save_path_file, pathlib.Path): 

152 save_path_file = str(save_path_file) 

153 if not save_path_file.endswith(".txt"): 

154 raise ValueError("Given savepath for txt-file is not a .txt file!") 

155 

156 # Load the relavant part of the df 

157 df_sub, headers = _convert_to_subset( 

158 df=tsd, 

159 columns=kwargs.get("columns", None), 

160 offset=kwargs.get("offset", 0) 

161 ) 

162 

163 # Unpack kwargs 

164 sep = kwargs.get("sep", "\t") 

165 

166 n_cols = len(headers) 

167 n_rows = len(df_sub.index) 

168 # Comment header line 

169 _temp_str = "" 

170 

171 if kwargs.get("with_tag", True): 

172 # Convert ("variable", "tag") to "variable_tag" 

173 _temp_str = sep.join(["_".join(variable_tag) for variable_tag in headers]) 

174 else: 

175 for idx, var in enumerate(headers): 

176 if idx == 0: 

177 # Convert time with tag to one string as unit is important 

178 _temp_str += "_".join(var) 

179 else: 

180 # Convert ("variable", "tag") to "variable" 

181 _temp_str += sep + var[0] 

182 content_as_lines = [f"#{_temp_str}\n"] 

183 content_as_lines.insert(0, f"double {table_name}({n_rows}, {n_cols})\n") 

184 content_as_lines.insert(0, "#1\n") # Print Modelica table no 

185 

186 # Open file and write the header 

187 with open(file=save_path_file, mode="a+", encoding="utf-8") as file: 

188 file.seek(0) 

189 file.truncate() # Delete possible old content 

190 file.writelines(content_as_lines) 

191 

192 # Append the data directly using to_csv from pandas 

193 df_sub.to_csv(save_path_file, header=None, index=None, sep=sep, mode="a") 

194 

195 return save_path_file 

196 

197 

198def _convert_to_subset(df, columns, offset): 

199 """ 

200 Private function to ensure lean conversion to either mat or txt. 

201 """ 

202 df = df.copy() 

203 if columns: 

204 if isinstance(columns, str): 

205 columns = [columns] # Must be a list 

206 headers = df[columns].columns.values.tolist() 

207 else: 

208 headers = df.columns.values.tolist() 

209 

210 _time_header = ('time', 'in_s') 

211 headers.insert(0, _time_header) # Ensure time will be at first place 

212 

213 if isinstance(df.index, tuple(datetime_indexes)): 

214 df.index = df.index - df.iloc[0].name.to_datetime64() # Make index zero based 

215 df[_time_header] = df.index.total_seconds() + offset 

216 elif index_is_numeric(df.index): 

217 df[_time_header] = df.index - df.iloc[0].name + offset 

218 else: 

219 # Should not happen as error is raised in data_types. But just to be sure: 

220 raise IndexError(f"Given index of type {type(df.index)} is not supported.") 

221 # Avoid 1e-8 errors in timedelta calculation. 

222 df[_time_header] = df[_time_header].round(4) 

223 

224 # Check if nan values occur 

225 if df.loc[:, headers].isnull().values.sum() > 0: 

226 raise ValueError("Selected columns contain NaN values. This would lead to errors" 

227 "in the simulation environment.") 

228 

229 # Convert cases with no tag to tuple 

230 def _to_tuple(s): 

231 if isinstance(s, tuple): 

232 return s 

233 return (s, ) 

234 headers_as_tuple = [_to_tuple(header) for header in headers] 

235 

236 return df.loc[:, headers], headers_as_tuple