Coverage for ebcpy/data_types.py: 98%

188 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2025-05-06 09:34 +0000

1""" 

2This module provides useful classes for all ebcpy. 

3Every data_type class should include every parameter 

4other classes like optimization etc. may need. The checking 

5of correct input is especially relevant here as the correct 

6format of data-types will prevent errors during simulations, 

7optimization etc. 

8""" 

9 

10import os 

11from pathlib import Path 

12from typing import List, Union, Any 

13from datetime import datetime 

14from pandas.core.internals import BlockManager 

15import pandas as pd 

16import numpy as np 

17import ebcpy.modelica.simres as sr 

18from ebcpy import preprocessing 

19from ebcpy.utils import get_names 

20 

21# pylint: disable=I1101 

22# pylint: disable=too-many-ancestors 

23 

24__all__ = ['TimeSeries', 

25 'TimeSeriesData', 

26 'numeric_indexes', 

27 'datetime_indexes'] 

28 

29numeric_index_dtypes = [ 

30 pd.Index([], dtype=dtype).dtype for dtype in 

31 ["int8", "int16", "int32", "int64", 

32 "uint8", "uint16", "uint32", "uint64", 

33 "float32", "float64"] 

34] 

35 

36datetime_indexes = [ 

37 pd.DatetimeIndex 

38] 

39 

40 

41def index_is_numeric(index: pd.Index): 

42 """Check if pandas Index is numeric""" 

43 return isinstance(index, pd.RangeIndex) or index.dtype in numeric_index_dtypes 

44 

45 

46class TimeSeriesData(pd.DataFrame): 

47 """ 

48 Most data related to energy and building 

49 climate related problems is time-variant. 

50 

51 Class for handling time series data using a pandas dataframe. 

52 This class works file-based and makes the import of different 

53 file-types into a pandas DataFrame more user-friendly. 

54 Furthermore, functions to support multi-indexing are provided to 

55 efficiently handle variable passed processing and provide easy 

56 visualization and preprocessing access. 

57 

58 :param str,os.path.normpath,pd.DataFrame data: 

59 Filepath ending with either .hdf, .mat, .csv, .parquet, 

60 or .parquet.COMPRESSION_NAME containing 

61 time-dependent data to be loaded as a pandas.DataFrame. 

62 Alternative option is to pass a DataFrame directly. 

63 :keyword str key: 

64 Name of the table in a .hdf-file if the file 

65 contains multiple tables. 

66 :keyword str sep: 

67 separator for the use of a csv file. If none is provided, 

68 a comma (",") is used as a default value. 

69 See pandas.read_csv() docs for further information. 

70 :keyword int, list header: 

71 Header columns for .csv files. 

72 See pandas.read_csv() docs for further information. 

73 Default is first row (0). 

74 :keyword int,str index_col: 

75 Column to be used as index in .csv files. 

76 See pandas.read_csv() docs for further information. 

77 Default is first column (0). 

78 :keyword str sheet_name: 

79 Name of the sheet you want to load data from. Required keyword 

80 argument when loading a xlsx-file. 

81 :keyword str default_tag: 

82 Which value to use as tag. Default is 'raw' 

83 :keyword str engine: 

84 Chose the engine for reading .parquet files. Default is 'pyarrow' 

85 Other option is 'fastparquet' (python>=3.9). 

86 :keyword list variable_names: 

87 List of variable names to load from .mat file. If you 

88 know which variables you want to plot, this may speed up 

89 loading significantly, and reduce memory size drastically. 

90 You can also supply wildcard patterns (e.g. "*wall.layer[*].T", etc.) 

91 to match multiple variables at once. 

92 

93 Examples: 

94 

95 First let's see the usage for a common dataframe. 

96 

97 >>> import numpy as np 

98 >>> import pandas as pd 

99 >>> from ebcpy import TimeSeriesData 

100 >>> df = pd.DataFrame({"my_variable": np.random.rand(5)}) 

101 >>> tsd = TimeSeriesData(df) 

102 >>> tsd.to_datetime_index() 

103 >>> tsd.save("my_new_data.csv") 

104 

105 Now, let's load the recently created file. 

106 As we just created the data, we specify the tag 

107 'sim' to indicate it is some sort of simulated value. 

108 

109 >>> tsd = TimeSeriesData("my_new_data.csv", tag='sim') 

110 """ 

111 

112 # normal properties 

113 _metadata = [ 

114 "_filepath", 

115 "_loader_kwargs", 

116 "_default_tag", 

117 "_multi_col_names" 

118 ] 

119 

120 def __init__(self, data: Union[str, Any], **kwargs): 

121 """Initialize class-objects and check correct input.""" 

122 # Initialize as default 

123 self._filepath = None 

124 self._loader_kwargs = {} 

125 self._multi_col_names = ["Variables", "Tags"] 

126 

127 self._default_tag = kwargs.pop("default_tag", "raw") 

128 if not isinstance(self._default_tag, str): 

129 raise TypeError(f"Invalid type for default_tag! Expected 'str' but " 

130 f"received {type(self._default_tag)}") 

131 

132 # Two possibles inputs. first argument is actually data provided by pandas 

133 # and kwargs hold further information or is it an actual filepath. 

134 if isinstance(data, BlockManager): 

135 super().__init__(data=data) 

136 return 

137 

138 if not isinstance(data, (str, Path)): 

139 _df_loaded = pd.DataFrame(data=data, 

140 index=kwargs.get("index", None), 

141 columns=kwargs.get("columns", None), 

142 dtype=kwargs.get("dtype", None), 

143 copy=kwargs.get("copy", False)) 

144 else: 

145 file = Path(data) 

146 self._loader_kwargs = kwargs.copy() 

147 _df_loaded = self._load_df_from_file(file=file) 

148 self._filepath = file 

149 

150 if _df_loaded.columns.nlevels == 1: 

151 # Check if first level is named Tags. 

152 # If so, don't create MultiIndex-DF as the method is called by the pd constructor 

153 if _df_loaded.columns.name != self._multi_col_names[1]: 

154 multi_col = pd.MultiIndex.from_product( 

155 [_df_loaded.columns, [self._default_tag]], 

156 names=self._multi_col_names 

157 ) 

158 _df_loaded.columns = multi_col 

159 

160 elif _df_loaded.columns.nlevels == 2: 

161 if _df_loaded.columns.names != self._multi_col_names: 

162 raise TypeError("Loaded dataframe has a different 2-Level " 

163 "header format than it is supported by this " 

164 "class. The names have to match.") 

165 else: 

166 raise TypeError("Only DataFrames with Multi-Columns with 2 " 

167 "Levels are supported by this class.") 

168 

169 super().__init__(_df_loaded) 

170 

171 @property 

172 def _constructor(self): 

173 """Overwrite constructor method according to: 

174 https://pandas.pydata.org/pandas-docs/stable/development/extending.html#extending-subclassing-pandas""" 

175 return TimeSeriesData 

176 

177 @property 

178 def _constructor_sliced(self): 

179 """Overwrite constructor method according to: 

180 https://pandas.pydata.org/pandas-docs/stable/development/extending.html#extending-subclassing-pandas""" 

181 return TimeSeries 

182 

183 @property 

184 def filepath(self) -> str: 

185 """Get the filepath associated with the time series data""" 

186 return self._filepath 

187 

188 @filepath.setter 

189 def filepath(self, filepath: str): 

190 """Set the filepath associated with the time series data""" 

191 self._filepath = Path(filepath) 

192 

193 @property 

194 def default_tag(self) -> str: 

195 """Get the default of time series data object""" 

196 return self._default_tag 

197 

198 @default_tag.setter 

199 def default_tag(self, tag: str) -> None: 

200 """Set the default_tag of the time series data object 

201 :param tag: new tag 

202 :type tag: String 

203 """ 

204 if not isinstance(tag, str): 

205 raise TypeError(f"Invalid type for default_tag! Expected 'str' but " 

206 f"received {type(tag)}") 

207 if tag not in self.get_tags(): 

208 raise KeyError(f"Tag '{tag}' does not exist for current data set!" 

209 f"\n Available tags: {self.get_tags()}") 

210 self._default_tag = tag 

211 

212 def save(self, filepath: str = None, **kwargs) -> None: 

213 """ 

214 Save the current time-series-data into the given file-format. 

215 Currently supported are .hdf, which is an easy and fast storage, 

216 and, .csv is supported as an easy-readable option. 

217 Also, .parquet, and with additional compression .parquet.COMPRESSION_NAME 

218 are supported. Compressions could be gzip, brotli or snappy. For all possible 

219 compressions see the documentation of the parquet engines. 

220 For a small comparison of these data formats see https://github.com/RWTH-EBC/ebcpy/issues/81 

221 

222 :param str,os.path.normpath filepath: 

223 Filepath were to store the data. Either .hdf, .csv, .parquet 

224 or .parquet.COMPRESSION_NAME has to be the file-ending. 

225 Default is current filepath of class. 

226 :keyword str key: 

227 Necessary keyword-argument for saving a .hdf-file. 

228 Specifies the key of the table in the .hdf-file. 

229 :keyword str sep: 

230 Separator used for saving as .csv. Default is ','. 

231 :keyword str engine: 

232 Chose the engine for reading .parquet files. Default is 'pyarrow' 

233 Other option is 'fastparquet' (python>=3.9). 

234 :return: 

235 """ 

236 # If new settings are needed, update existing ones 

237 self._loader_kwargs.update(kwargs) 

238 # Set filepath if not given 

239 if filepath is None: 

240 filepath = self.filepath 

241 else: 

242 filepath = Path(filepath) 

243 # Check if filepath is still None (if no filepath was used in init) 

244 if filepath is None: 

245 raise ValueError("Current TimeSeriesData instance " 

246 "has no filepath, please specify one.") 

247 # Save based on file suffix 

248 if filepath.suffix == ".hdf": 

249 if "key" not in kwargs: 

250 raise KeyError("Argument 'key' must be " 

251 "specified to save a .hdf file") 

252 pd.DataFrame(self).to_hdf(filepath, key=kwargs.get("key")) 

253 

254 elif filepath.suffix == ".csv": 

255 pd.DataFrame(self).to_csv(filepath, sep=kwargs.get("sep", ",")) 

256 elif ".parquet" in filepath.name: 

257 parquet_split = filepath.name.split(".parquet") 

258 pd.DataFrame(self).to_parquet( 

259 filepath, engine=kwargs.get('engine', 'pyarrow'), 

260 compression=parquet_split[-1][1:] if parquet_split[-1] else None, 

261 index=True) 

262 else: 

263 raise TypeError("Given file-format is not supported." 

264 "You can only store TimeSeriesData as .hdf, .csv, .parquet, " 

265 "and .parquet.COMPRESSION_NAME with additional compression options") 

266 

267 def to_df(self, force_single_index=False): 

268 """ 

269 Return the dataframe version of the current TimeSeriesData object. 

270 If all tags are equal, the tags are dropped. 

271 Else, the object is just converted. 

272 

273 :param bool force_single_index: 

274 If True (not the default), the conversion to a standard 

275 DataFrame with a single index column (only variable names) 

276 is only done if no variable contains multiple tags. 

277 """ 

278 if len(self.get_variables_with_multiple_tags()) == 0: 

279 return pd.DataFrame(self.droplevel(1, axis=1)) 

280 if force_single_index: 

281 raise IndexError( 

282 "Can't automatically drop all tags " 

283 "as the following variables contain multiple tags: " 

284 f"{' ,'.join(self.get_variables_with_multiple_tags())}. " 

285 ) 

286 return pd.DataFrame(self) 

287 

288 def _load_df_from_file(self, file): 

289 """Function to load a given filepath into a dataframe""" 

290 # Check whether the file exists 

291 if not os.path.isfile(file): 

292 raise FileNotFoundError( 

293 f"The given filepath {file} could not be opened") 

294 

295 # Open based on file suffix. 

296 # Currently, hdf, csv, and Modelica result files (mat) are supported. 

297 if file.suffix == ".hdf": 

298 # Load the current file as a hdf to a dataframe. 

299 # As specifying the key can be a problem, the user will 

300 # get all keys of the file if one is necessary but not provided. 

301 key = self._loader_kwargs.get("key") 

302 if key == "": 

303 key = None # Avoid cryptic error in pandas by converting empty string to None 

304 try: 

305 df = pd.read_hdf(file, key=key) 

306 except (ValueError, KeyError) as error: 

307 keys = ", ".join(get_keys_of_hdf_file(file)) 

308 raise KeyError(f"key must be provided when HDF5 file contains multiple datasets. " 

309 f"Here are all keys in the given hdf-file: {keys}") from error 

310 elif file.suffix == ".csv": 

311 # Check if file was previously a TimeSeriesData object 

312 with open(file, "r") as _f: 

313 lines = [_f.readline() for _ in range(2)] 

314 if (lines[0].startswith(self._multi_col_names[0]) and 

315 lines[1].startswith(self._multi_col_names[1])): 

316 _hea_def = [0, 1] 

317 else: 

318 _hea_def = 0 

319 

320 df = pd.read_csv( 

321 file, 

322 sep=self._loader_kwargs.get("sep", ","), 

323 index_col=self._loader_kwargs.get("index_col", 0), 

324 header=self._loader_kwargs.get("header", _hea_def) 

325 ) 

326 elif file.suffix == ".mat": 

327 df = sr.mat_to_pandas( 

328 fname=file, 

329 with_unit=False, 

330 names=self._loader_kwargs.get("variable_names") 

331 ) 

332 elif file.suffix in ['.xlsx', '.xls', '.odf', '.ods', '.odt']: 

333 sheet_name = self._loader_kwargs.get("sheet_name") 

334 if sheet_name is None: 

335 raise KeyError("sheet_name is a required keyword argument to load xlsx-files." 

336 "Please pass a string to specify the name " 

337 "of the sheet you want to load.") 

338 df = pd.read_excel(io=file, sheet_name=sheet_name) 

339 elif ".parquet" in file.name: 

340 df = pd.read_parquet(path=file, engine=self._loader_kwargs.get('engine', 'pyarrow')) 

341 else: 

342 raise TypeError("Only .hdf, .csv, .xlsx and .mat are supported!") 

343 if not isinstance(df.index, tuple(datetime_indexes)) and not index_is_numeric(df.index): 

344 try: 

345 df.index = pd.DatetimeIndex(df.index) 

346 except Exception as err: 

347 raise IndexError( 

348 f"Given data has index of type {type(df.index)}. " 

349 f"Currently only numeric indexes and the following are supported:" 

350 f"{' ,'.join([str(idx) for idx in [pd.RangeIndex] + datetime_indexes])} " 

351 f"Automatic conversion to pd.DateTimeIndex failed" 

352 f"see error above." 

353 ) from err 

354 return df 

355 

356 def get_variable_names(self, patterns: Union[str, List[str]] = None) -> List[str]: 

357 """ 

358 Return an alphabetically sorted list of variable names, optionally filtered by patterns. 

359 

360 By default, returns all variable names found in the first level of the DataFrame's 

361 column MultiIndex, sorted alphabetically. If `patterns` is provided, only names 

362 matching one or more of the given literal strings or glob-style patterns 

363 (where `*` matches any sequence of characters) will be returned. 

364 

365 :param patterns: 

366 - A single string or list of strings. 

367 - Each entry may be an exact variable name, or a pattern containing `*` as a wildcard. 

368 - If None, all variable names are returned. 

369 :return: 

370 A list of matching variable names, in alphabetical order. 

371 :raises KeyError: 

372 If any literal name or pattern does not match at least one variable in the DataFrame. 

373 

374 Example: 

375 # return all wall temperatures at any layer 

376 tsd.get_variable_names("*wall.layer[*].T") 

377 ["wall.layer[1].T", "wall.layer[2].T", "wall.layer[3].T"] 

378 """ 

379 all_names = sorted(self.columns.get_level_values(0).unique()) 

380 if patterns is None: 

381 return all_names 

382 return get_names(all_names, patterns) 

383 

384 def get_variables_with_multiple_tags(self) -> List[str]: 

385 """ 

386 Return an alphabetically sorted list of all variables 

387 that contain more than one tag. 

388 

389 :return: List[str] 

390 """ 

391 var_names = self.columns.get_level_values(0) 

392 return sorted(var_names[var_names.duplicated()]) 

393 

394 def get_tags(self, variable: str = None) -> List[str]: 

395 """ 

396 Return an alphabetically sorted list of all tags 

397 

398 :param str variable: 

399 If given, tags of this variable are returned 

400 

401 :return: List[str] 

402 """ 

403 if variable: 

404 tags = self.loc[:, variable].columns 

405 return sorted(tags) 

406 return sorted(self.columns.get_level_values(1).unique()) 

407 

408 def get_columns_by_tag(self, 

409 tag: str, 

410 variables: list = None, 

411 return_type: str = 'pandas', 

412 drop_level: bool = False): 

413 """ 

414 Returning all columns with defined tag in the form of ndarray. 

415 

416 :param str tag: 

417 Define the tag which return columns have to 

418 match. 

419 :param list variables: 

420 Besides the given tag, specify the 

421 variables names matching the return criteria as well. 

422 :param boolean drop_level: 

423 If tag should be included in the response. 

424 Default is True. 

425 :param str return_type: 

426 Return format. Options are: 

427 - pandas (pd.series) 

428 - numpy, scipy, sp, and np (np.array) 

429 - control (transposed np.array) 

430 :return: ndarray of input signals 

431 """ 

432 # Extract columns 

433 if variables: 

434 _ret = self.loc[:, variables] 

435 else: 

436 _ret = self 

437 

438 _ret = _ret.xs(tag, axis=1, level=1, drop_level=drop_level) 

439 

440 # Return based on the given return_type 

441 if return_type.lower() == 'pandas': 

442 return _ret 

443 if return_type.lower() in ['numpy', 'scipy', 'sp', 'np']: 

444 return _ret.to_numpy() 

445 if return_type.lower() == 'control': 

446 return _ret.to_numpy().transpose() 

447 raise TypeError("Unknown return type") 

448 

449 def to_datetime_index(self, unit_of_index="s", origin=datetime.now(), inplace: bool = True): 

450 """ 

451 Convert the current index to a float based index using 

452 ebcpy.preprocessing.convert_index_to_datetime_index() 

453 

454 :param str unit_of_index: default 's' 

455 The unit of the given index. Used to convert to 

456 total_seconds later on. 

457 :param datetime.datetime origin: 

458 The reference datetime object for the first index. 

459 Default is the current system time. 

460 :param bool inplace: 

461 If True, performs operation inplace and returns None. 

462 :return: df 

463 Copy of DataFrame with correct index for usage in this 

464 framework. 

465 

466 """ 

467 return preprocessing.convert_index_to_datetime_index(df=self, 

468 unit_of_index=unit_of_index, 

469 origin=origin, 

470 inplace=inplace) 

471 

472 def to_float_index(self, offset=0, inplace: bool = True): 

473 """ 

474 Convert the current index to a float based index using 

475 ebcpy.preprocessing.convert_datetime_index_to_float_index() 

476 

477 :param float offset: 

478 Offset in seconds 

479 :param bool inplace: 

480 If True, performs operation inplace and returns None. 

481 :return: pd.DataFrame df: 

482 DataFrame with correct index. 

483 """ 

484 if not isinstance(self.index, pd.DatetimeIndex): 

485 return 

486 

487 return preprocessing.convert_datetime_index_to_float_index(df=self, 

488 offset=offset, 

489 inplace=inplace) 

490 

491 def clean_and_space_equally(self, desired_freq, inplace: bool = True): 

492 """ 

493 Call to the preprocessing function 

494 ebcpy.preprocessing.clean_and_space_equally_time_series() 

495 See the docstring of this function to know what is happening. 

496 

497 :param str desired_freq: 

498 Frequency to determine number of elements in processed dataframe. 

499 Options are for example: 

500 - s: second-based 

501 - 5s: Every 5 seconds 

502 - 6min: Every 6 minutes 

503 This also works for h, d, m, y, ms etc. 

504 :param bool inplace: 

505 If True, performs operation inplace and returns None. 

506 :return: pd.DataFrame 

507 Cleaned and equally spaced data-frame 

508 """ 

509 df = preprocessing.clean_and_space_equally_time_series(df=self, 

510 desired_freq=desired_freq) 

511 if inplace: 

512 super().__init__(df) 

513 return None 

514 else: 

515 return df 

516 

517 def low_pass_filter(self, crit_freq, filter_order, variable, 

518 tag=None, new_tag="low_pass_filter"): 

519 """ 

520 Call to the preprocessing function 

521 ebcpy.preprocessing.low_pass_filter() 

522 See the docstring of this function to know what is happening. 

523 

524 :param float crit_freq: 

525 The critical frequency or frequencies. 

526 :param int filter_order: 

527 The order of the filter 

528 :param str variable: 

529 The variable name to apply the filter to 

530 :param str tag: 

531 If this variable has more than one tag, specify which one 

532 :param str new_tag: 

533 The new tag to pass to the variable. 

534 Default is 'low_pass_filter' 

535 """ 

536 if tag is None: 

537 data = self.loc[:, variable].to_numpy() 

538 else: 

539 data = self.loc[:, (variable, tag)].to_numpy() 

540 

541 result = preprocessing.low_pass_filter( 

542 data=data, 

543 filter_order=filter_order, 

544 crit_freq=crit_freq 

545 ) 

546 self.loc[:, (variable, new_tag)] = result 

547 

548 def moving_average(self, window, variable, 

549 tag=None, new_tag="moving_average"): 

550 """ 

551 Call to the preprocessing function 

552 ebcpy.preprocessing.moving_average() 

553 See the docstring of this function to know what is happening. 

554 

555 :param int window: 

556 sample rate of input 

557 :param str variable: 

558 The variable name to apply the filter to 

559 :param str tag: 

560 If this variable has more than one tag, specify which one 

561 :param str new_tag: 

562 The new tag to pass to the variable. 

563 Default is 'low_pass_filter' 

564 """ 

565 if tag is None: 

566 data = self.loc[:, variable].to_numpy() 

567 else: 

568 data = self.loc[:, (variable, tag)].to_numpy() 

569 

570 result = preprocessing.moving_average( 

571 data=data, 

572 window=window, 

573 ) 

574 self.loc[:, (variable, new_tag)] = result 

575 

576 def number_lines_totally_na(self): 

577 """ 

578 Returns the number of rows in the given dataframe 

579 that are filled with NaN-values. 

580 """ 

581 return preprocessing.number_lines_totally_na(self) 

582 

583 @property 

584 def frequency(self): 

585 """ 

586 The frequency of the time series data. 

587 Returns's the mean and the standard deviation of 

588 the index. 

589 

590 :returns: 

591 float: Mean value 

592 float: Standard deviation 

593 """ 

594 return preprocessing.get_df_index_frequency_mean_and_std( 

595 df_index=self.index 

596 ) 

597 

598 

599class TimeSeries(pd.Series): 

600 """Overwrites pd.Series to enable correct slicing 

601 and expansion in the TimeSeriesData class 

602 

603 .. versionadded:: 0.1.7 

604 """ 

605 

606 @property 

607 def _constructor(self): 

608 """Overwrite constructor method according to: 

609 https://pandas.pydata.org/pandas-docs/stable/development/extending.html#extending-subclassing-pandas""" 

610 return TimeSeries 

611 

612 @property 

613 def _constructor_expanddim(self): 

614 """Overwrite constructor method according to: 

615 https://pandas.pydata.org/pandas-docs/stable/development/extending.html#extending-subclassing-pandas""" 

616 return TimeSeriesData 

617 

618 

619def get_keys_of_hdf_file(filepath): 

620 """ 

621 Find all keys in a given hdf-file. 

622 

623 :param str,os.path.normpath filepath: 

624 Path to the .hdf-file 

625 :return: list 

626 List with all keys in the given file. 

627 """ 

628 # pylint: disable=import-outside-toplevel 

629 try: 

630 import h5py 

631 with h5py.File(filepath, 'r') as hdf_file: 

632 return list(hdf_file.keys()) 

633 except ImportError: 

634 return ["ERROR: Could not obtain keys as h5py is not installed"]