sklearn Pipelines: The Right Way to Build ML WorkflowsΒΆ
Pipelines prevent data leakage, make code reproducible, and enable one-line cross-validation. Learn to combine preprocessing and modeling into a single object.
1. Why Pipelines? The Data Leakage ProblemΒΆ
Data leakage happens when information from the test set βleaksβ into training β leading to overoptimistic evaluation scores that donβt generalize.
The most common form: fitting a scaler on the entire dataset before splitting.
import numpy as np
import pandas as pd
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score
# Synthetic dataset
X, y = make_classification(n_samples=1000, n_features=20, random_state=42)
# β WRONG: Scaler fitted on ALL data before split
scaler = StandardScaler()
X_scaled_wrong = scaler.fit_transform(X) # uses test data statistics!
X_train_wrong, X_test_wrong, y_train, y_test = train_test_split(
X_scaled_wrong, y, test_size=0.2, random_state=42
)
model = LogisticRegression(random_state=42)
model.fit(X_train_wrong, y_train)
wrong_score = accuracy_score(y_test, model.predict(X_test_wrong))
print(f"Wrong approach (leaky) accuracy: {wrong_score:.4f}")
# β
CORRECT: Scaler fitted only on train data
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train) # fit only on train
X_test_scaled = scaler.transform(X_test) # transform only
model = LogisticRegression(random_state=42)
model.fit(X_train_scaled, y_train)
correct_score = accuracy_score(y_test, model.predict(X_test_scaled))
print(f"Correct approach (no leakage) accuracy: {correct_score:.4f}")
print(f"\nDifference: {wrong_score - correct_score:.4f} (leaky inflates score)")
The difference looks small here, but with small datasets or aggressive preprocessing (e.g., feature selection), leakage can inflate accuracy by 5β20%.
Pipelines eliminate this problem entirely by ensuring fit only ever sees training data.
2. Building Your First PipelineΒΆ
Weβll use the Titanic dataset from seaborn β a classic ML dataset with mixed types and missing values.
import seaborn as sns
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, classification_report
# Load Titanic from seaborn
titanic = sns.load_dataset('titanic')
print(titanic[['age', 'fare', 'pclass', 'survived']].head())
print(f"\nMissing values:\n{titanic[['age', 'fare']].isnull().sum()}")
# Use only numeric features for this simple example
features = ['age', 'fare', 'pclass']
target = 'survived'
df = titanic[features + [target]].dropna(subset=[target])
X = df[features]
y = df[target].astype(int)
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
# Build the pipeline: steps are (name, estimator) tuples
pipe = Pipeline(steps=[
('imputer', SimpleImputer(strategy='median')), # step 1: fill NaNs
('scaler', StandardScaler()), # step 2: normalize
('clf', LogisticRegression(random_state=42)) # step 3: model
])
# One call handles ALL steps, with zero leakage
pipe.fit(X_train, y_train)
y_pred = pipe.predict(X_test)
print(f"Accuracy: {accuracy_score(y_test, y_pred):.4f}")
print("\n", classification_report(y_test, y_pred))
Key insight: pipe.fit(X_train, y_train) calls fit_transform on each transformer and fit on the final estimator. During pipe.predict(), only transform is called β no refitting.
3. ColumnTransformer: Handling Mixed Data TypesΒΆ
Real datasets have numeric AND categorical columns. ColumnTransformer lets you apply different preprocessing to each column type.
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import OneHotEncoder
from sklearn.ensemble import RandomForestClassifier
# Use more features including categorical
features_num = ['age', 'fare', 'pclass']
features_cat = ['sex', 'embarked']
all_features = features_num + features_cat
df2 = titanic[all_features + ['survived']].copy()
df2 = df2.dropna(subset=['survived'])
X2 = df2[all_features]
y2 = df2['survived'].astype(int)
X_train2, X_test2, y_train2, y_test2 = train_test_split(
X2, y2, test_size=0.2, random_state=42, stratify=y2
)
# Define preprocessing for each column type
numeric_transformer = Pipeline(steps=[
('imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler())
])
categorical_transformer = Pipeline(steps=[
('imputer', SimpleImputer(strategy='most_frequent')),
('onehot', OneHotEncoder(handle_unknown='ignore', sparse_output=False))
])
# Combine into a ColumnTransformer
preprocessor = ColumnTransformer(transformers=[
('num', numeric_transformer, features_num),
('cat', categorical_transformer, features_cat)
])
# Full pipeline: preprocessor + model
full_pipe = Pipeline(steps=[
('preprocessor', preprocessor),
('clf', RandomForestClassifier(n_estimators=100, random_state=42))
])
full_pipe.fit(X_train2, y_train2)
print(f"Accuracy with full feature set: {full_pipe.score(X_test2, y_test2):.4f}")
print("\nPipeline structure:")
print(full_pipe)
4. Custom TransformersΒΆ
You can write your own sklearn-compatible transformers by inheriting from BaseEstimator and TransformerMixin. The magic: implement fit and transform, get fit_transform for free.
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.preprocessing import PowerTransformer
import numpy as np
class SkewnessReducer(BaseEstimator, TransformerMixin):
"""
Apply Yeo-Johnson transform only to columns with |skewness| > threshold.
Columns below threshold are left untouched.
"""
def __init__(self, threshold=0.5):
self.threshold = threshold
def fit(self, X, y=None):
X = pd.DataFrame(X) if not isinstance(X, pd.DataFrame) else X
self.skewed_cols_ = X.columns[
X.skew().abs() > self.threshold
].tolist()
# Fit a PowerTransformer only on skewed columns
if self.skewed_cols_:
self.pt_ = PowerTransformer(method='yeo-johnson')
self.pt_.fit(X[self.skewed_cols_])
return self
def transform(self, X, y=None):
X = pd.DataFrame(X) if not isinstance(X, pd.DataFrame) else X.copy()
if self.skewed_cols_:
X[self.skewed_cols_] = self.pt_.transform(X[self.skewed_cols_])
return X.values
# Test it
from sklearn.datasets import make_regression
X_demo, _ = make_regression(n_samples=200, n_features=4, random_state=42)
X_demo_df = pd.DataFrame(X_demo, columns=['a', 'b', 'c', 'd'])
# Add a skewed column
X_demo_df['e'] = np.exp(np.random.randn(200)) # log-normal, highly skewed
reducer = SkewnessReducer(threshold=0.5)
reducer.fit(X_demo_df)
print(f"Skewed columns found: {reducer.skewed_cols_}")
X_transformed = reducer.transform(X_demo_df)
print(f"Output shape: {X_transformed.shape}")
print("Custom transformer works seamlessly in a Pipeline!")
5. Pipeline + GridSearchCV: The Power ComboΒΆ
The __ (double underscore) notation lets you set parameters on any step inside the pipeline. This makes hyperparameter tuning dead simple.
from sklearn.model_selection import GridSearchCV
from sklearn.svm import SVC
# Build a pipeline with SVC
pipe_svc = Pipeline(steps=[
('imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler()),
('clf', SVC(random_state=42))
])
# Access params with: <step_name>__<param_name>
param_grid = {
'imputer__strategy': ['mean', 'median'], # tune imputer
'clf__C': [0.1, 1.0, 10.0], # tune model
'clf__kernel': ['rbf', 'linear'],
}
grid_search = GridSearchCV(
pipe_svc,
param_grid,
cv=5,
scoring='accuracy',
n_jobs=-1,
verbose=1
)
# Use the smaller numeric-only dataset
grid_search.fit(X_train, y_train)
print(f"Best params: {grid_search.best_params_}")
print(f"Best CV score: {grid_search.best_score_:.4f}")
print(f"Test score: {grid_search.score(X_test, y_test):.4f}")
6. Saving and Loading Pipelines with joblibΒΆ
import joblib
import os
import tempfile
# Save the best pipeline from GridSearchCV
best_pipe = grid_search.best_estimator_
with tempfile.TemporaryDirectory() as tmpdir:
model_path = os.path.join(tmpdir, 'titanic_pipeline.joblib')
# Save
joblib.dump(best_pipe, model_path)
size_kb = os.path.getsize(model_path) / 1024
print(f"Pipeline saved to: {model_path}")
print(f"File size: {size_kb:.1f} KB")
# Load
loaded_pipe = joblib.load(model_path)
# Verify identical predictions
preds_original = best_pipe.predict(X_test)
preds_loaded = loaded_pipe.predict(X_test)
print(f"\nPredictions match: {np.array_equal(preds_original, preds_loaded)}")
print("\nProduction usage:")
print(" joblib.dump(pipe, 'model.joblib') # Save once")
print(" pipe = joblib.load('model.joblib') # Load anywhere")
print(" pipe.predict(new_data) # Preprocessing included!")
7. Common GotchasΒΆ
Gotcha |
Problem |
Fix |
|---|---|---|
Fitting scaler before split |
Data leakage |
Always use Pipeline |
|
Leakage |
Pipeline calls |
Forgetting |
Crashes on new categories in prod |
Always set this |
|
|
Use |
Saving only the model (not scaler) |
Predictions wrong in prod |
Save the whole pipeline |
|
Shape issues |
Use |
Tip: Run pipe.get_params() to see all tunable parameters β itβs the fastest way to build your param_grid.
# Useful debugging: list all parameters
for key in sorted(full_pipe.get_params().keys()):
print(f" {key}")
ExercisesΒΆ
Leakage audit: Take the βwrongβ approach code from Section 1 and measure how much the accuracy changes across 10 different random seeds. Is the leakage consistent?
Pipeline extension: Add a
SelectKBestfeature selector as a step betweenpreprocessorandclfin thefull_pipe. Tunekin GridSearchCV.Custom transformer: Write a
CyclicalEncoderthat transforms a month column (1β12) intosinandcosfeatures. Make it sklearn-compatible (inheritsBaseEstimator,TransformerMixin).Production pipeline: Build a pipeline on the
diamondsseaborn dataset (predictprice). Save it with joblib, reload it, and verify predictions match. Include both numeric and categorical columns.Compare strategies: Build two pipelines β one with
SimpleImputer(strategy='mean')and one withKNNImputer(n_neighbors=5). Which performs better on Titanic? Use cross-validation to compare fairly.