Coverage for ebcpy/data_types.py: 98%

184 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-09-19 12:21 +0000

1""" 

2This module provides useful classes for all ebcpy. 

3Every data_type class should include every parameter 

4other classes like optimization etc. may need. The checking 

5of correct input is especially relevant here as the correct 

6format of data-types will prevent errors during simulations, 

7optimization etc. 

8""" 

9 

10import os 

11from pathlib import Path 

12from typing import List, Union, Any 

13from datetime import datetime 

14from pandas.core.internals import BlockManager 

15import pandas as pd 

16import numpy as np 

17import ebcpy.modelica.simres as sr 

18from ebcpy import preprocessing 

19 

20# pylint: disable=I1101 

21# pylint: disable=too-many-ancestors 

22 

23__all__ = ['TimeSeries', 

24 'TimeSeriesData', 

25 'numeric_indexes', 

26 'datetime_indexes'] 

27 

28numeric_index_dtypes = [ 

29 pd.Index([], dtype=dtype).dtype for dtype in 

30 ["int8", "int16", "int32", "int64", 

31 "uint8", "uint16", "uint32", "uint64", 

32 "float32", "float64"] 

33] 

34 

35datetime_indexes = [ 

36 pd.DatetimeIndex 

37] 

38 

39 

40def index_is_numeric(index: pd.Index): 

41 """Check if pandas Index is numeric""" 

42 return isinstance(index, pd.RangeIndex) or index.dtype in numeric_index_dtypes 

43 

44 

45class TimeSeriesData(pd.DataFrame): 

46 """ 

47 Most data related to energy and building 

48 climate related problems is time-variant. 

49 

50 Class for handling time series data using a pandas dataframe. 

51 This class works file-based and makes the import of different 

52 file-types into a pandas DataFrame more user-friendly. 

53 Furthermore, functions to support multi-indexing are provided to 

54 efficiently handle variable passed processing and provide easy 

55 visualization and preprocessing access. 

56 

57 :param str,os.path.normpath,pd.DataFrame data: 

58 Filepath ending with either .hdf, .mat, .csv, .parquet, 

59 or .parquet.COMPRESSION_NAME containing 

60 time-dependent data to be loaded as a pandas.DataFrame. 

61 Alternative option is to pass a DataFrame directly. 

62 :keyword str key: 

63 Name of the table in a .hdf-file if the file 

64 contains multiple tables. 

65 :keyword str sep: 

66 separator for the use of a csv file. If none is provided, 

67 a comma (",") is used as a default value. 

68 See pandas.read_csv() docs for further information. 

69 :keyword int, list header: 

70 Header columns for .csv files. 

71 See pandas.read_csv() docs for further information. 

72 Default is first row (0). 

73 :keyword int,str index_col: 

74 Column to be used as index in .csv files. 

75 See pandas.read_csv() docs for further information. 

76 Default is first column (0). 

77 :keyword str sheet_name: 

78 Name of the sheet you want to load data from. Required keyword 

79 argument when loading a xlsx-file. 

80 :keyword str default_tag: 

81 Which value to use as tag. Default is 'raw' 

82 :keyword str engine: 

83 Chose the engine for reading .parquet files. Default is 'pyarrow' 

84 Other option is 'fastparquet' (python>=3.9). 

85 :keyword list variable_names: 

86 List of variable names to load from .mat file. If you 

87 know which variables you want to plot, this may speed up 

88 loading significantly, and reduce memory size drastically. 

89 

90 Examples: 

91 

92 First let's see the usage for a common dataframe. 

93 

94 >>> import numpy as np 

95 >>> import pandas as pd 

96 >>> from ebcpy import TimeSeriesData 

97 >>> df = pd.DataFrame({"my_variable": np.random.rand(5)}) 

98 >>> tsd = TimeSeriesData(df) 

99 >>> tsd.to_datetime_index() 

100 >>> tsd.save("my_new_data.csv") 

101 

102 Now, let's load the recently created file. 

103 As we just created the data, we specify the tag 

104 'sim' to indicate it is some sort of simulated value. 

105 

106 >>> tsd = TimeSeriesData("my_new_data.csv", tag='sim') 

107 """ 

108 

109 # normal properties 

110 _metadata = [ 

111 "_filepath", 

112 "_loader_kwargs", 

113 "_default_tag", 

114 "_multi_col_names" 

115 ] 

116 

117 def __init__(self, data: Union[str, Any], **kwargs): 

118 """Initialize class-objects and check correct input.""" 

119 # Initialize as default 

120 self._filepath = None 

121 self._loader_kwargs = {} 

122 self._multi_col_names = ["Variables", "Tags"] 

123 

124 self._default_tag = kwargs.pop("default_tag", "raw") 

125 if not isinstance(self._default_tag, str): 

126 raise TypeError(f"Invalid type for default_tag! Expected 'str' but " 

127 f"received {type(self._default_tag)}") 

128 

129 # Two possibles inputs. first argument is actually data provided by pandas 

130 # and kwargs hold further information or is it an actual filepath. 

131 if isinstance(data, BlockManager): 

132 super().__init__(data=data) 

133 return 

134 

135 if not isinstance(data, (str, Path)): 

136 _df_loaded = pd.DataFrame(data=data, 

137 index=kwargs.get("index", None), 

138 columns=kwargs.get("columns", None), 

139 dtype=kwargs.get("dtype", None), 

140 copy=kwargs.get("copy", False)) 

141 else: 

142 file = Path(data) 

143 self._loader_kwargs = kwargs.copy() 

144 _df_loaded = self._load_df_from_file(file=file) 

145 self._filepath = file 

146 

147 if _df_loaded.columns.nlevels == 1: 

148 # Check if first level is named Tags. 

149 # If so, don't create MultiIndex-DF as the method is called by the pd constructor 

150 if _df_loaded.columns.name != self._multi_col_names[1]: 

151 multi_col = pd.MultiIndex.from_product( 

152 [_df_loaded.columns, [self._default_tag]], 

153 names=self._multi_col_names 

154 ) 

155 _df_loaded.columns = multi_col 

156 

157 elif _df_loaded.columns.nlevels == 2: 

158 if _df_loaded.columns.names != self._multi_col_names: 

159 raise TypeError("Loaded dataframe has a different 2-Level " 

160 "header format than it is supported by this " 

161 "class. The names have to match.") 

162 else: 

163 raise TypeError("Only DataFrames with Multi-Columns with 2 " 

164 "Levels are supported by this class.") 

165 

166 super().__init__(_df_loaded) 

167 

168 @property 

169 def _constructor(self): 

170 """Overwrite constructor method according to: 

171 https://pandas.pydata.org/pandas-docs/stable/development/extending.html#extending-subclassing-pandas""" 

172 return TimeSeriesData 

173 

174 @property 

175 def _constructor_sliced(self): 

176 """Overwrite constructor method according to: 

177 https://pandas.pydata.org/pandas-docs/stable/development/extending.html#extending-subclassing-pandas""" 

178 return TimeSeries 

179 

180 @property 

181 def filepath(self) -> str: 

182 """Get the filepath associated with the time series data""" 

183 return self._filepath 

184 

185 @filepath.setter 

186 def filepath(self, filepath: str): 

187 """Set the filepath associated with the time series data""" 

188 self._filepath = Path(filepath) 

189 

190 @property 

191 def default_tag(self) -> str: 

192 """Get the default of time series data object""" 

193 return self._default_tag 

194 

195 @default_tag.setter 

196 def default_tag(self, tag: str) -> None: 

197 """Set the default_tag of the time series data object 

198 :param tag: new tag 

199 :type tag: String 

200 """ 

201 if not isinstance(tag, str): 

202 raise TypeError(f"Invalid type for default_tag! Expected 'str' but " 

203 f"received {type(tag)}") 

204 if tag not in self.get_tags(): 

205 raise KeyError(f"Tag '{tag}' does not exist for current data set!" 

206 f"\n Available tags: {self.get_tags()}") 

207 self._default_tag = tag 

208 

209 def save(self, filepath: str = None, **kwargs) -> None: 

210 """ 

211 Save the current time-series-data into the given file-format. 

212 Currently supported are .hdf, which is an easy and fast storage, 

213 and, .csv is supported as an easy-readable option. 

214 Also, .parquet, and with additional compression .parquet.COMPRESSION_NAME 

215 are supported. Compressions could be gzip, brotli or snappy. For all possible 

216 compressions see the documentation of the parquet engines. 

217 For a small comparison of these data formats see https://github.com/RWTH-EBC/ebcpy/issues/81 

218 

219 :param str,os.path.normpath filepath: 

220 Filepath were to store the data. Either .hdf, .csv, .parquet 

221 or .parquet.COMPRESSION_NAME has to be the file-ending. 

222 Default is current filepath of class. 

223 :keyword str key: 

224 Necessary keyword-argument for saving a .hdf-file. 

225 Specifies the key of the table in the .hdf-file. 

226 :keyword str sep: 

227 Separator used for saving as .csv. Default is ','. 

228 :keyword str engine: 

229 Chose the engine for reading .parquet files. Default is 'pyarrow' 

230 Other option is 'fastparquet' (python>=3.9). 

231 :return: 

232 """ 

233 # If new settings are needed, update existing ones 

234 self._loader_kwargs.update(kwargs) 

235 # Set filepath if not given 

236 if filepath is None: 

237 filepath = self.filepath 

238 else: 

239 filepath = Path(filepath) 

240 # Check if filepath is still None (if no filepath was used in init) 

241 if filepath is None: 

242 raise ValueError("Current TimeSeriesData instance " 

243 "has no filepath, please specify one.") 

244 # Save based on file suffix 

245 if filepath.suffix == ".hdf": 

246 if "key" not in kwargs: 

247 raise KeyError("Argument 'key' must be " 

248 "specified to save a .hdf file") 

249 pd.DataFrame(self).to_hdf(filepath, key=kwargs.get("key")) 

250 

251 elif filepath.suffix == ".csv": 

252 pd.DataFrame(self).to_csv(filepath, sep=kwargs.get("sep", ",")) 

253 elif ".parquet" in filepath.name: 

254 parquet_split = filepath.name.split(".parquet") 

255 pd.DataFrame(self).to_parquet( 

256 filepath, engine=kwargs.get('engine', 'pyarrow'), 

257 compression=parquet_split[-1][1:] if parquet_split[-1] else None, 

258 index=True) 

259 else: 

260 raise TypeError("Given file-format is not supported." 

261 "You can only store TimeSeriesData as .hdf, .csv, .parquet, " 

262 "and .parquet.COMPRESSION_NAME with additional compression options") 

263 

264 def to_df(self, force_single_index=False): 

265 """ 

266 Return the dataframe version of the current TimeSeriesData object. 

267 If all tags are equal, the tags are dropped. 

268 Else, the object is just converted. 

269 

270 :param bool force_single_index: 

271 If True (not the default), the conversion to a standard 

272 DataFrame with a single index column (only variable names) 

273 is only done if no variable contains multiple tags. 

274 """ 

275 if len(self.get_variables_with_multiple_tags()) == 0: 

276 return pd.DataFrame(self.droplevel(1, axis=1)) 

277 if force_single_index: 

278 raise IndexError( 

279 "Can't automatically drop all tags " 

280 "as the following variables contain multiple tags: " 

281 f"{' ,'.join(self.get_variables_with_multiple_tags())}. " 

282 ) 

283 return pd.DataFrame(self) 

284 

285 def _load_df_from_file(self, file): 

286 """Function to load a given filepath into a dataframe""" 

287 # Check whether the file exists 

288 if not os.path.isfile(file): 

289 raise FileNotFoundError( 

290 f"The given filepath {file} could not be opened") 

291 

292 # Open based on file suffix. 

293 # Currently, hdf, csv, and Modelica result files (mat) are supported. 

294 if file.suffix == ".hdf": 

295 # Load the current file as a hdf to a dataframe. 

296 # As specifying the key can be a problem, the user will 

297 # get all keys of the file if one is necessary but not provided. 

298 key = self._loader_kwargs.get("key") 

299 if key == "": 

300 key = None # Avoid cryptic error in pandas by converting empty string to None 

301 try: 

302 df = pd.read_hdf(file, key=key) 

303 except (ValueError, KeyError) as error: 

304 keys = ", ".join(get_keys_of_hdf_file(file)) 

305 raise KeyError(f"key must be provided when HDF5 file contains multiple datasets. " 

306 f"Here are all keys in the given hdf-file: {keys}") from error 

307 elif file.suffix == ".csv": 

308 # Check if file was previously a TimeSeriesData object 

309 with open(file, "r") as _f: 

310 lines = [_f.readline() for _ in range(2)] 

311 if (lines[0].startswith(self._multi_col_names[0]) and 

312 lines[1].startswith(self._multi_col_names[1])): 

313 _hea_def = [0, 1] 

314 else: 

315 _hea_def = 0 

316 

317 df = pd.read_csv( 

318 file, 

319 sep=self._loader_kwargs.get("sep", ","), 

320 index_col=self._loader_kwargs.get("index_col", 0), 

321 header=self._loader_kwargs.get("header", _hea_def) 

322 ) 

323 elif file.suffix == ".mat": 

324 df = sr.mat_to_pandas( 

325 fname=file, 

326 with_unit=False, 

327 names=self._loader_kwargs.get("variable_names") 

328 ) 

329 elif file.suffix in ['.xlsx', '.xls', '.odf', '.ods', '.odt']: 

330 sheet_name = self._loader_kwargs.get("sheet_name") 

331 if sheet_name is None: 

332 raise KeyError("sheet_name is a required keyword argument to load xlsx-files." 

333 "Please pass a string to specify the name " 

334 "of the sheet you want to load.") 

335 df = pd.read_excel(io=file, sheet_name=sheet_name) 

336 elif ".parquet" in file.name: 

337 df = pd.read_parquet(path=file, engine=self._loader_kwargs.get('engine', 'pyarrow')) 

338 else: 

339 raise TypeError("Only .hdf, .csv, .xlsx and .mat are supported!") 

340 if not isinstance(df.index, tuple(datetime_indexes)) and not index_is_numeric(df.index): 

341 try: 

342 df.index = pd.DatetimeIndex(df.index) 

343 except Exception as err: 

344 raise IndexError( 

345 f"Given data has index of type {type(df.index)}. " 

346 f"Currently only numeric indexes and the following are supported:" 

347 f"{' ,'.join([str(idx) for idx in [pd.RangeIndex] + datetime_indexes])} " 

348 f"Automatic conversion to pd.DateTimeIndex failed" 

349 f"see error above." 

350 ) from err 

351 return df 

352 

353 def get_variable_names(self) -> List[str]: 

354 """ 

355 Return an alphabetically sorted list of all variables 

356 

357 :return: List[str] 

358 """ 

359 return sorted(self.columns.get_level_values(0).unique()) 

360 

361 def get_variables_with_multiple_tags(self) -> List[str]: 

362 """ 

363 Return an alphabetically sorted list of all variables 

364 that contain more than one tag. 

365 

366 :return: List[str] 

367 """ 

368 var_names = self.columns.get_level_values(0) 

369 return sorted(var_names[var_names.duplicated()]) 

370 

371 def get_tags(self, variable: str = None) -> List[str]: 

372 """ 

373 Return an alphabetically sorted list of all tags 

374 

375 :param str variable: 

376 If given, tags of this variable are returned 

377 

378 :return: List[str] 

379 """ 

380 if variable: 

381 tags = self.loc[:, variable].columns 

382 return sorted(tags) 

383 return sorted(self.columns.get_level_values(1).unique()) 

384 

385 def get_columns_by_tag(self, 

386 tag: str, 

387 variables: list = None, 

388 return_type: str = 'pandas', 

389 drop_level: bool = False): 

390 """ 

391 Returning all columns with defined tag in the form of ndarray. 

392 

393 :param str tag: 

394 Define the tag which return columns have to 

395 match. 

396 :param list variables: 

397 Besides the given tag, specify the 

398 variables names matching the return criteria as well. 

399 :param boolean drop_level: 

400 If tag should be included in the response. 

401 Default is True. 

402 :param str return_type: 

403 Return format. Options are: 

404 - pandas (pd.series) 

405 - numpy, scipy, sp, and np (np.array) 

406 - control (transposed np.array) 

407 :return: ndarray of input signals 

408 """ 

409 # Extract columns 

410 if variables: 

411 _ret = self.loc[:, variables] 

412 else: 

413 _ret = self 

414 

415 _ret = _ret.xs(tag, axis=1, level=1, drop_level=drop_level) 

416 

417 # Return based on the given return_type 

418 if return_type.lower() == 'pandas': 

419 return _ret 

420 if return_type.lower() in ['numpy', 'scipy', 'sp', 'np']: 

421 return _ret.to_numpy() 

422 if return_type.lower() == 'control': 

423 return _ret.to_numpy().transpose() 

424 raise TypeError("Unknown return type") 

425 

426 def to_datetime_index(self, unit_of_index="s", origin=datetime.now(), inplace: bool = True): 

427 """ 

428 Convert the current index to a float based index using 

429 ebcpy.preprocessing.convert_index_to_datetime_index() 

430 

431 :param str unit_of_index: default 's' 

432 The unit of the given index. Used to convert to 

433 total_seconds later on. 

434 :param datetime.datetime origin: 

435 The reference datetime object for the first index. 

436 Default is the current system time. 

437 :param bool inplace: 

438 If True, performs operation inplace and returns None. 

439 :return: df 

440 Copy of DataFrame with correct index for usage in this 

441 framework. 

442 

443 """ 

444 return preprocessing.convert_index_to_datetime_index(df=self, 

445 unit_of_index=unit_of_index, 

446 origin=origin, 

447 inplace=inplace) 

448 

449 def to_float_index(self, offset=0, inplace: bool = True): 

450 """ 

451 Convert the current index to a float based index using 

452 ebcpy.preprocessing.convert_datetime_index_to_float_index() 

453 

454 :param float offset: 

455 Offset in seconds 

456 :param bool inplace: 

457 If True, performs operation inplace and returns None. 

458 :return: pd.DataFrame df: 

459 DataFrame with correct index. 

460 """ 

461 if not isinstance(self.index, pd.DatetimeIndex): 

462 return 

463 

464 return preprocessing.convert_datetime_index_to_float_index(df=self, 

465 offset=offset, 

466 inplace=inplace) 

467 

468 def clean_and_space_equally(self, desired_freq, inplace: bool = True): 

469 """ 

470 Call to the preprocessing function 

471 ebcpy.preprocessing.clean_and_space_equally_time_series() 

472 See the docstring of this function to know what is happening. 

473 

474 :param str desired_freq: 

475 Frequency to determine number of elements in processed dataframe. 

476 Options are for example: 

477 - s: second-based 

478 - 5s: Every 5 seconds 

479 - 6min: Every 6 minutes 

480 This also works for h, d, m, y, ms etc. 

481 :param bool inplace: 

482 If True, performs operation inplace and returns None. 

483 :return: pd.DataFrame 

484 Cleaned and equally spaced data-frame 

485 """ 

486 df = preprocessing.clean_and_space_equally_time_series(df=self, 

487 desired_freq=desired_freq) 

488 if inplace: 

489 super().__init__(df) 

490 return None 

491 else: 

492 return df 

493 

494 def low_pass_filter(self, crit_freq, filter_order, variable, 

495 tag=None, new_tag="low_pass_filter"): 

496 """ 

497 Call to the preprocessing function 

498 ebcpy.preprocessing.low_pass_filter() 

499 See the docstring of this function to know what is happening. 

500 

501 :param float crit_freq: 

502 The critical frequency or frequencies. 

503 :param int filter_order: 

504 The order of the filter 

505 :param str variable: 

506 The variable name to apply the filter to 

507 :param str tag: 

508 If this variable has more than one tag, specify which one 

509 :param str new_tag: 

510 The new tag to pass to the variable. 

511 Default is 'low_pass_filter' 

512 """ 

513 if tag is None: 

514 data = self.loc[:, variable].to_numpy() 

515 else: 

516 data = self.loc[:, (variable, tag)].to_numpy() 

517 

518 result = preprocessing.low_pass_filter( 

519 data=data, 

520 filter_order=filter_order, 

521 crit_freq=crit_freq 

522 ) 

523 self.loc[:, (variable, new_tag)] = result 

524 

525 def moving_average(self, window, variable, 

526 tag=None, new_tag="moving_average"): 

527 """ 

528 Call to the preprocessing function 

529 ebcpy.preprocessing.moving_average() 

530 See the docstring of this function to know what is happening. 

531 

532 :param int window: 

533 sample rate of input 

534 :param str variable: 

535 The variable name to apply the filter to 

536 :param str tag: 

537 If this variable has more than one tag, specify which one 

538 :param str new_tag: 

539 The new tag to pass to the variable. 

540 Default is 'low_pass_filter' 

541 """ 

542 if tag is None: 

543 data = self.loc[:, variable].to_numpy() 

544 else: 

545 data = self.loc[:, (variable, tag)].to_numpy() 

546 

547 result = preprocessing.moving_average( 

548 data=data, 

549 window=window, 

550 ) 

551 self.loc[:, (variable, new_tag)] = result 

552 

553 def number_lines_totally_na(self): 

554 """ 

555 Returns the number of rows in the given dataframe 

556 that are filled with NaN-values. 

557 """ 

558 return preprocessing.number_lines_totally_na(self) 

559 

560 @property 

561 def frequency(self): 

562 """ 

563 The frequency of the time series data. 

564 Returns's the mean and the standard deviation of 

565 the index. 

566 

567 :returns: 

568 float: Mean value 

569 float: Standard deviation 

570 """ 

571 return preprocessing.get_df_index_frequency_mean_and_std( 

572 df_index=self.index 

573 ) 

574 

575 

576class TimeSeries(pd.Series): 

577 """Overwrites pd.Series to enable correct slicing 

578 and expansion in the TimeSeriesData class 

579 

580 .. versionadded:: 0.1.7 

581 """ 

582 

583 @property 

584 def _constructor(self): 

585 """Overwrite constructor method according to: 

586 https://pandas.pydata.org/pandas-docs/stable/development/extending.html#extending-subclassing-pandas""" 

587 return TimeSeries 

588 

589 @property 

590 def _constructor_expanddim(self): 

591 """Overwrite constructor method according to: 

592 https://pandas.pydata.org/pandas-docs/stable/development/extending.html#extending-subclassing-pandas""" 

593 return TimeSeriesData 

594 

595 

596def get_keys_of_hdf_file(filepath): 

597 """ 

598 Find all keys in a given hdf-file. 

599 

600 :param str,os.path.normpath filepath: 

601 Path to the .hdf-file 

602 :return: list 

603 List with all keys in the given file. 

604 """ 

605 # pylint: disable=import-outside-toplevel 

606 try: 

607 import h5py 

608 with h5py.File(filepath, 'r') as hdf_file: 

609 return list(hdf_file.keys()) 

610 except ImportError: 

611 return ["ERROR: Could not obtain keys as h5py is not installed"]