Coverage for addmo/s1_data_tuning_auto/feature_selection.py: 21%
71 statements
« prev ^ index » next coverage.py v7.4.4, created at 2025-08-31 13:05 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2025-08-31 13:05 +0000
1import pandas as pd
2import numpy as np
3from sklearn.feature_selection import VarianceThreshold
4from sklearn.decomposition import FastICA
5from sklearn.feature_selection import GenericUnivariateSelect, mutual_info_regression, f_regression
6from sklearn.feature_selection import RFE
7from sklearn.ensemble import RandomForestRegressor
8from sklearn.model_selection import cross_val_score
9from addmo.s1_data_tuning_auto.config.data_tuning_auto_config import DataTuningAutoSetup
12def manual_feature_select(config: DataTuningAutoSetup, x):
13 """
14 Manual selection of features
15 """
16 return x[config.selected_features]
19def filter_low_variance(config: DataTuningAutoSetup, x):
20 """
21 Pre-Filter removing features with low variance.
22 For documentation see scikit-learning.org.
23 """
24 filter = VarianceThreshold(threshold=config.low_variance_threshold).fit(
25 X=x
26 ).set_output(transform="pandas") # fit filter
27 x_processed = filter.transform(X=x) # transform the system_data
28 return x_processed
31def filter_ica(x):
32 """
33 Filter Independent Component Analysis (ICA)
34 """
35 Ica = FastICA(max_iter=1000)
36 x_transformed = Ica.fit_transform(X=x)
37 x_processed = pd.DataFrame(x_transformed, columns=x.columns, index=x.index)
38 return x_processed
41def filter_univariate(config: DataTuningAutoSetup, x, y):
42 """
43 Filter univariate with scoring function f-test or mutual information
44 and search mode : {‘percentile’, ‘k_best’, ‘fpr’, ‘fdr’, ‘fwe’}
45 For documentation see scikit-learning.org.
46 """
47 score_function_map = {
48 "mutual_info_regression": mutual_info_regression,
49 "f_regression": f_regression,
50 }
52 score_func = score_function_map.get(config.univariate_score_function)
53 if score_func is None:
54 raise ValueError(
55 f"Invalid score function '{config.univariate_score_function}'. "
56 "Must be one of: 'mutual_info_regression', 'f_regression'."
57 )
59 selector = GenericUnivariateSelect(
60 score_func=score_func,
61 mode=config.univariate_search_mode,
62 param=config.univariate_filter_params,
63 ).set_output(transform="pandas")
65 selector = selector.fit(X=x, y=y)
66 x_processed = selector.transform(X=x)
67 return x_processed
69# embedded Feature Selection by recursive feature elimination (Feature Subset Selection, multivariate)
70def recursive_feature_selection_by_count(config: DataTuningAutoSetup, x, y):
71 """
72 Embedded Feature Selection by recursive feature elimination (multivariate) based on the number of features to select.
73 For documentation see scikit-learning.org.
74 """
76 model = RandomForestRegressor(random_state=42)
77 min_features_to_select = config.recursive_embedded_number_features_to_select
79 n_features = x.shape[1]
80 current_features = list(range(n_features))
82 while len(current_features) > min_features_to_select:
83 selector = RFE(estimator=model, n_features_to_select=len(current_features))
84 selector = selector.fit(x.iloc[:, current_features], y)
86 scores = cross_val_score(model, x.iloc[:, current_features], y, cv=5, scoring='r2')
87 mean_score = np.mean(scores)
88 print(f"Features: {len(current_features)}, CV Score: {mean_score:.4f}")
90 ranking = selector.ranking_
91 least_important_feature = np.where(ranking == max(ranking))[0][0]
92 current_features.pop(least_important_feature)
94 print(f"Selected {len(current_features)} features after recursive elimination.")
95 return x.iloc[:, current_features]
98def recursive_feature_selection_by_score(config: DataTuningAutoSetup, x, y):
99 """
100 Recursive feature elimination based on score improvement.
101 Stops when cross-validation score increase falls below the configured threshold.
102 """
104 model = RandomForestRegressor(random_state=42)
105 min_increase = config.min_increase_for_wrapper
107 n_features = x.shape[1]
108 current_features = list(range(n_features))
109 last_score = -np.inf
110 best_features = current_features.copy()
111 best_score = last_score
113 while len(current_features) > 1: # Stop when only one feature left
114 selector = RFE(estimator=model, n_features_to_select=len(current_features))
115 selector = selector.fit(x.iloc[:, current_features], y)
117 # Evaluate with cross-validation
118 scores = cross_val_score(model, x.iloc[:, current_features], y, cv=5, scoring='r2')
119 mean_score = np.mean(scores)
120 print(f"Features: {len(current_features)}, CV Score: {mean_score:.4f}")
122 score_improvement = mean_score - last_score
124 # Stop if score improvement is too small
125 if score_improvement < min_increase:
126 print("Score improvement below threshold. Stopping.")
127 break
129 # Update best set
130 best_score = mean_score
131 best_features = current_features.copy()
132 last_score = mean_score
134 # Eliminate features
135 ranking = selector.ranking_
136 least_important_feature = np.where(ranking == max(ranking))[0][0]
137 current_features.pop(least_important_feature)
139 print(f"Selected {len(best_features)} features with best CV score: {best_score:.4f}")
140 return x.iloc[:, best_features]