Coverage for addmo/s1_data_tuning_auto/feature_selection.py: 21%

71 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2025-08-31 13:05 +0000

1import pandas as pd 

2import numpy as np 

3from sklearn.feature_selection import VarianceThreshold 

4from sklearn.decomposition import FastICA 

5from sklearn.feature_selection import GenericUnivariateSelect, mutual_info_regression, f_regression 

6from sklearn.feature_selection import RFE 

7from sklearn.ensemble import RandomForestRegressor 

8from sklearn.model_selection import cross_val_score 

9from addmo.s1_data_tuning_auto.config.data_tuning_auto_config import DataTuningAutoSetup 

10 

11 

12def manual_feature_select(config: DataTuningAutoSetup, x): 

13 """ 

14 Manual selection of features 

15 """ 

16 return x[config.selected_features] 

17 

18 

19def filter_low_variance(config: DataTuningAutoSetup, x): 

20 """ 

21 Pre-Filter removing features with low variance. 

22 For documentation see scikit-learning.org. 

23 """ 

24 filter = VarianceThreshold(threshold=config.low_variance_threshold).fit( 

25 X=x 

26 ).set_output(transform="pandas") # fit filter 

27 x_processed = filter.transform(X=x) # transform the system_data 

28 return x_processed 

29 

30 

31def filter_ica(x): 

32 """ 

33 Filter Independent Component Analysis (ICA) 

34 """ 

35 Ica = FastICA(max_iter=1000) 

36 x_transformed = Ica.fit_transform(X=x) 

37 x_processed = pd.DataFrame(x_transformed, columns=x.columns, index=x.index) 

38 return x_processed 

39 

40 

41def filter_univariate(config: DataTuningAutoSetup, x, y): 

42 """ 

43 Filter univariate with scoring function f-test or mutual information 

44 and search mode : {‘percentile’, ‘k_best’, ‘fpr’, ‘fdr’, ‘fwe’} 

45 For documentation see scikit-learning.org. 

46 """ 

47 score_function_map = { 

48 "mutual_info_regression": mutual_info_regression, 

49 "f_regression": f_regression, 

50 } 

51 

52 score_func = score_function_map.get(config.univariate_score_function) 

53 if score_func is None: 

54 raise ValueError( 

55 f"Invalid score function '{config.univariate_score_function}'. " 

56 "Must be one of: 'mutual_info_regression', 'f_regression'." 

57 ) 

58 

59 selector = GenericUnivariateSelect( 

60 score_func=score_func, 

61 mode=config.univariate_search_mode, 

62 param=config.univariate_filter_params, 

63 ).set_output(transform="pandas") 

64 

65 selector = selector.fit(X=x, y=y) 

66 x_processed = selector.transform(X=x) 

67 return x_processed 

68 

69# embedded Feature Selection by recursive feature elimination (Feature Subset Selection, multivariate) 

70def recursive_feature_selection_by_count(config: DataTuningAutoSetup, x, y): 

71 """ 

72 Embedded Feature Selection by recursive feature elimination (multivariate) based on the number of features to select. 

73 For documentation see scikit-learning.org. 

74 """ 

75 

76 model = RandomForestRegressor(random_state=42) 

77 min_features_to_select = config.recursive_embedded_number_features_to_select 

78 

79 n_features = x.shape[1] 

80 current_features = list(range(n_features)) 

81 

82 while len(current_features) > min_features_to_select: 

83 selector = RFE(estimator=model, n_features_to_select=len(current_features)) 

84 selector = selector.fit(x.iloc[:, current_features], y) 

85 

86 scores = cross_val_score(model, x.iloc[:, current_features], y, cv=5, scoring='r2') 

87 mean_score = np.mean(scores) 

88 print(f"Features: {len(current_features)}, CV Score: {mean_score:.4f}") 

89 

90 ranking = selector.ranking_ 

91 least_important_feature = np.where(ranking == max(ranking))[0][0] 

92 current_features.pop(least_important_feature) 

93 

94 print(f"Selected {len(current_features)} features after recursive elimination.") 

95 return x.iloc[:, current_features] 

96 

97 

98def recursive_feature_selection_by_score(config: DataTuningAutoSetup, x, y): 

99 """ 

100 Recursive feature elimination based on score improvement. 

101 Stops when cross-validation score increase falls below the configured threshold. 

102 """ 

103 

104 model = RandomForestRegressor(random_state=42) 

105 min_increase = config.min_increase_for_wrapper 

106 

107 n_features = x.shape[1] 

108 current_features = list(range(n_features)) 

109 last_score = -np.inf 

110 best_features = current_features.copy() 

111 best_score = last_score 

112 

113 while len(current_features) > 1: # Stop when only one feature left 

114 selector = RFE(estimator=model, n_features_to_select=len(current_features)) 

115 selector = selector.fit(x.iloc[:, current_features], y) 

116 

117 # Evaluate with cross-validation 

118 scores = cross_val_score(model, x.iloc[:, current_features], y, cv=5, scoring='r2') 

119 mean_score = np.mean(scores) 

120 print(f"Features: {len(current_features)}, CV Score: {mean_score:.4f}") 

121 

122 score_improvement = mean_score - last_score 

123 

124 # Stop if score improvement is too small 

125 if score_improvement < min_increase: 

126 print("Score improvement below threshold. Stopping.") 

127 break 

128 

129 # Update best set 

130 best_score = mean_score 

131 best_features = current_features.copy() 

132 last_score = mean_score 

133 

134 # Eliminate features 

135 ranking = selector.ranking_ 

136 least_important_feature = np.where(ranking == max(ranking))[0][0] 

137 current_features.pop(least_important_feature) 

138 

139 print(f"Selected {len(best_features)} features with best CV score: {best_score:.4f}") 

140 return x.iloc[:, best_features]