Files
deap-based-automl-experimen…/src/preprocessing.py
2025-11-24 23:15:00 -04:00

139 lines
5.2 KiB
Python

from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, StandardScaler, RobustScaler, MinMaxScaler, PowerTransformer, PolynomialFeatures
from sklearn.impute import SimpleImputer
from sklearn.feature_selection import SelectKBest, f_classif, f_regression
from sklearn.feature_selection import VarianceThreshold
from sklearn.base import BaseEstimator, TransformerMixin
import numpy as np
class SafeSelectK(BaseEstimator, TransformerMixin):
def __init__(self, task: str, k=None):
self.task = task
self.k = k
self.selector_ = None
self.k_effective_ = None
self.support_mask_ = None
self.feature_names_in_ = None
self.feature_names_out_ = None
def fit(self, X, y=None):
if self.k is None:
self.selector_ = "passthrough"
self.feature_names_out_ = self.feature_names_in_
return self
n_feats = X.shape[1]
k_eff = int(min(max(1, self.k), n_feats))
score_func = f_classif if self.task == "classification" else f_regression
sel = SelectKBest(score_func=score_func, k=k_eff).fit(X, y)
self.selector_ = sel
self.k_effective_ = k_eff
mask = np.zeros(n_feats, dtype=bool)
mask[sel.get_support(indices=True)] = True
self.support_mask_ = mask
if self.feature_names_in_ is not None:
self.feature_names_out_ = self.feature_names_in_[mask]
return self
def set_feature_names_in(self, names):
self.feature_names_in_ = np.asarray(names)
def transform(self, X):
if self.selector_ == "passthrough":
return X
return self.selector_.transform(X)
def get_feature_names_out(self, input_features=None):
if getattr(self, "feature_names_out_", None) is not None:
return self.feature_names_out_
if getattr(self, "support_mask_", None) is not None and input_features is not None:
input_features = np.asarray(input_features)
return input_features[self.support_mask_]
return None
class ConstantFilter(BaseEstimator, TransformerMixin):
def __init__(self, eps=0.0):
self.eps = eps
self.mask_ = None
self.feature_names_in_ = None
self.feature_names_out_ = None
def fit(self, X, y=None):
X = np.asarray(X)
var = X.var(axis=0)
self.mask_ = var > self.eps
if self.feature_names_in_ is not None:
self.feature_names_out_ = np.asarray(self.feature_names_in_)[self.mask_]
return self
def set_feature_names_in(self, names):
self.feature_names_in_ = np.asarray(names)
def get_feature_names_out(self):
if self.feature_names_out_ is not None:
return self.feature_names_out_
# fallback when names were not set
return np.array([f"f{i}" for i, keep in enumerate(self.mask_) if keep])
def transform(self, X):
X = np.asarray(X)
return X[:, self.mask_]
def build_preprocessor(X_full, task, cfg, fixed_k=None, fixed_poly_degree=None):
cat_cols = X_full.select_dtypes(include=["object", "category", "bool"]).columns.tolist()
num_cols = [c for c in X_full.columns if c not in cat_cols]
num_imputer = SimpleImputer(strategy=cfg.get("num_impute_strategy", "median"))
cat_imputer = SimpleImputer(strategy=cfg.get("cat_impute_strategy", "most_frequent"))
scaler_name = cfg.get("scaler", "standard")
if scaler_name == "standard":
num_scaler = StandardScaler(with_mean=True, with_std=True)
elif scaler_name == "robust":
num_scaler = RobustScaler()
elif scaler_name == "minmax":
num_scaler = MinMaxScaler()
elif scaler_name == "power":
num_scaler = PowerTransformer(method="yeo-johnson")
else:
num_scaler = "passthrough"
poly_degree = fixed_poly_degree if fixed_poly_degree is not None else cfg.get("poly_degree", 1)
poly = PolynomialFeatures(degree=poly_degree, include_bias=False) if poly_degree > 1 else "passthrough"
# always fix categories from the full dataset
fixed_categories = None
if len(cat_cols) > 0:
fixed_categories = {c: sorted(X_full[c].dropna().astype(str).unique()) for c in cat_cols}
ohe_kwargs = dict(handle_unknown="ignore")
try:
ohe_kwargs["sparse_output"] = False
except TypeError:
ohe_kwargs["sparse"] = False
if fixed_categories is not None:
ohe_kwargs["categories"] = [fixed_categories[c] for c in cat_cols]
cat_encoder = OneHotEncoder(**ohe_kwargs)
num_steps = [("impute", num_imputer), ("scale", num_scaler), ("poly", poly)]
if int(cfg.get("use_vt", 0)):
num_steps.append(("vt", VarianceThreshold(threshold=float(cfg.get("vt_thr", 0.0)))))
ct = ColumnTransformer([
("num", Pipeline(steps=num_steps), num_cols),
("cat", Pipeline(steps=[("impute", cat_imputer), ("oh", cat_encoder)]), cat_cols),
])
select_k = fixed_k if fixed_k is not None else cfg.get("select_k", None)
selector = SafeSelectK(task=task, k=select_k)
pre = Pipeline([
("prep", ct),
("drop_const", ConstantFilter(eps=0.0)),
("select", selector),
])
return pre