CI/CD for Machine LearningΒΆ
π― Learning ObjectivesΒΆ
Understand CI/CD for ML
Automate model training
Implement automated testing
Deploy models automatically
Version control everything
What is CI/CD for ML?ΒΆ
CI (Continuous Integration):
Automated testing of code changes
Model validation on every commit
Data validation
Integration tests
CD (Continuous Deployment):
Automated model deployment
Gradual rollout (canary, A/B)
Automated monitoring
Rollback on failure
ML Pipeline StagesΒΆ
Code Change β Test β Train β Evaluate β Deploy β Monitor
β β β β β β
Linting Unit Model Metrics Registry Alerts
Type Data Valid Compare Version Dashboard
Check Valid Schema A/B Test Package
Testing ML CodeΒΆ
Automated testing for ML differs from traditional software testing because you need to verify not just correctness but also statistical properties of model behavior. The test suite below uses pytest fixtures to provide reusable sample data, then validates three things: the model can be trained without errors, predictions have the expected shape and value range, and accuracy meets a minimum threshold. This last test β the quality gate β is unique to ML pipelines: it prevents a poorly performing model from ever reaching production, even if the code runs without errors. In CI systems, these tests run on every commit, catching regressions immediately.
# Example test file: test_model.py
import pytest
import numpy as np
from sklearn.datasets import load_iris
from sklearn.ensemble import RandomForestClassifier
class TestModel:
"""Test suite for ML model"""
@pytest.fixture
def sample_data(self):
"""Provide sample data for tests"""
iris = load_iris()
return iris.data[:10], iris.target[:10]
def test_model_trains(self, sample_data):
"""Test that model can be trained"""
X, y = sample_data
model = RandomForestClassifier(n_estimators=10, random_state=42)
model.fit(X, y)
assert hasattr(model, 'estimators_')
print("β Model trains successfully")
def test_model_predicts(self, sample_data):
"""Test that model makes predictions"""
X, y = sample_data
model = RandomForestClassifier(n_estimators=10, random_state=42)
model.fit(X, y)
predictions = model.predict(X)
assert len(predictions) == len(X)
assert all(p in [0, 1, 2] for p in predictions)
print("β Model predicts successfully")
def test_model_accuracy(self, sample_data):
"""Test minimum accuracy threshold"""
X, y = sample_data
model = RandomForestClassifier(n_estimators=10, random_state=42)
model.fit(X, y)
predictions = model.predict(X)
accuracy = (predictions == y).mean()
assert accuracy >= 0.7, f"Accuracy {accuracy} below threshold"
print(f"β Model accuracy: {accuracy:.2%}")
# Run tests (would use pytest in practice)
tests = TestModel()
data = tests.sample_data()
tests.test_model_trains(data)
tests.test_model_predicts(data)
tests.test_model_accuracy(data)
Data ValidationΒΆ
Data validation catches problems before they reach the model β malformed inputs, missing values, out-of-range features, and schema changes. The DataValidator class below enforces three checks: schema validation (correct number of features), range validation (feature values within expected bounds), and missing-value detection (no NaN or infinity). In production ML systems, data validation runs both during training (to ensure data quality) and at inference time (to reject bad inputs early). Tools like Great Expectations and Pandera extend this pattern with declarative validation rules and integration with orchestrators like Airflow.
import numpy as np
from typing import Dict, Any
class DataValidator:
"""Validate input data quality"""
def __init__(self, schema: Dict[str, Any]):
self.schema = schema
def validate_schema(self, data: np.ndarray) -> bool:
"""Check data matches expected schema"""
expected_features = self.schema['n_features']
if data.ndim != 2:
raise ValueError(f"Expected 2D array, got {data.ndim}D")
if data.shape[1] != expected_features:
raise ValueError(
f"Expected {expected_features} features, got {data.shape[1]}"
)
return True
def validate_ranges(self, data: np.ndarray) -> bool:
"""Check feature values are in expected ranges"""
for i, (min_val, max_val) in enumerate(self.schema['feature_ranges']):
feature_data = data[:, i]
if feature_data.min() < min_val or feature_data.max() > max_val:
raise ValueError(
f"Feature {i} out of range: "
f"[{feature_data.min():.2f}, {feature_data.max():.2f}] "
f"not in [{min_val}, {max_val}]"
)
return True
def validate_missing(self, data: np.ndarray) -> bool:
"""Check for missing values"""
if np.isnan(data).any():
raise ValueError("Data contains NaN values")
if np.isinf(data).any():
raise ValueError("Data contains infinite values")
return True
def validate_all(self, data: np.ndarray) -> Dict[str, bool]:
"""Run all validations"""
results = {
'schema': self.validate_schema(data),
'ranges': self.validate_ranges(data),
'missing': self.validate_missing(data)
}
return results
# Example usage
schema = {
'n_features': 4,
'feature_ranges': [(0, 10), (0, 10), (0, 10), (0, 10)]
}
validator = DataValidator(schema)
# Valid data
valid_data = np.random.rand(10, 4) * 10
results = validator.validate_all(valid_data)
print("Validation results:", results)
print("β All validations passed")
GitHub Actions WorkflowΒΆ
GitHub Actions automates the ML pipeline by defining jobs that run on every push or pull request. The workflow below has three sequential stages: test (run pytest and validate data), train (train the model and evaluate it), and deploy (build a Docker image and push to a registry). The needs keyword enforces the dependency chain β training only starts after tests pass, and deployment only happens on the main branch after training succeeds. The model artifact is passed between jobs using actions/upload-artifact and actions/download-artifact, keeping each job stateless.
github_workflow = '''
name: ML Pipeline
on:
push:
branches: [ main ]
pull_request:
branches: [ main ]
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Install dependencies
run: |
pip install -r requirements.txt
pip install pytest pytest-cov
- name: Run tests
run: |
pytest tests/ --cov=. --cov-report=xml
- name: Validate data schema
run: |
python scripts/validate_data.py
train:
needs: test
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Install dependencies
run: pip install -r requirements.txt
- name: Train model
run: python scripts/train_model.py
- name: Evaluate model
run: python scripts/evaluate_model.py
- name: Upload model artifact
uses: actions/upload-artifact@v3
with:
name: model
path: models/
deploy:
needs: train
runs-on: ubuntu-latest
if: github.ref == 'refs/heads/main'
steps:
- uses: actions/checkout@v3
- name: Download model
uses: actions/download-artifact@v3
with:
name: model
path: models/
- name: Build Docker image
run: docker build -t ml-api:${{ github.sha }} .
- name: Push to registry
run: |
echo ${{ secrets.DOCKER_PASSWORD }} | docker login -u ${{ secrets.DOCKER_USERNAME }} --password-stdin
docker push ml-api:${{ github.sha }}
'''
print("GitHub Actions Workflow:")
print(github_workflow)
Model Training ScriptΒΆ
A production training script goes beyond model.fit() β it loads data, trains, evaluates, saves artifacts, and enforces quality gates. The train_model() function below saves both the serialized model and a metrics.json file that records accuracy, F1 score, and metadata. The quality gate at the end raises an exception if accuracy falls below the minimum threshold, which causes the CI job to fail and prevents deployment. This pattern ensures that even if someone accidentally introduces a data bug or a hyperparameter regression, the pipeline halts before a degraded model reaches production.
# scripts/train_model.py (example)
import joblib
import json
from datetime import datetime
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, f1_score
def train_model():
"""Train and save model"""
print("Loading data...")
iris = load_iris()
X_train, X_test, y_train, y_test = train_test_split(
iris.data, iris.target, test_size=0.2, random_state=42
)
print("Training model...")
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_train, y_train)
print("Evaluating model...")
y_pred = model.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
f1 = f1_score(y_test, y_pred, average='weighted')
print(f"Accuracy: {accuracy:.4f}")
print(f"F1 Score: {f1:.4f}")
# Save model
model_path = 'models/model.pkl'
joblib.dump(model, model_path)
print(f"Model saved to {model_path}")
# Save metrics
metrics = {
'accuracy': accuracy,
'f1_score': f1,
'timestamp': datetime.now().isoformat(),
'n_samples': len(X_train)
}
with open('models/metrics.json', 'w') as f:
json.dump(metrics, f, indent=2)
print("Metrics saved")
# Minimum quality gate
MIN_ACCURACY = 0.85
if accuracy < MIN_ACCURACY:
raise ValueError(
f"Model accuracy {accuracy:.4f} below threshold {MIN_ACCURACY}"
)
print("β Training complete, quality gates passed")
return model, metrics
# Run training
model, metrics = train_model()
print("\nFinal Metrics:", json.dumps(metrics, indent=2))
Automated Deployment StrategyΒΆ
The deployment script ties together the entire pipeline with a canary rollout strategy. After passing smoke tests, the new model version receives 10% of traffic via Kubernetes. A five-minute monitoring window checks the error rate from Prometheus β if it exceeds 1%, the deployment is automatically rolled back. Only after the canary passes health checks does traffic ramp to 100%. This automation eliminates human error from the deployment process while providing a safety net: bad models are caught within minutes rather than after user complaints.
deployment_script = '''
#!/bin/bash
# scripts/deploy.sh
set -e # Exit on error
MODEL_VERSION=$1
ENVIRONMENT=$2
if [ -z "$MODEL_VERSION" ] || [ -z "$ENVIRONMENT" ]; then
echo "Usage: deploy.sh <version> <environment>"
exit 1
fi
echo "Deploying model $MODEL_VERSION to $ENVIRONMENT"
# 1. Run smoke tests
echo "Running smoke tests..."
pytest tests/smoke/ --model-version=$MODEL_VERSION
# 2. Deploy with canary strategy
echo "Starting canary deployment (10% traffic)..."
kubectl set image deployment/ml-api \
ml-api=myregistry/ml-api:$MODEL_VERSION \
--canary-weight=10
# 3. Monitor for 5 minutes
echo "Monitoring canary for 5 minutes..."
sleep 300
# 4. Check metrics
ERROR_RATE=$(curl -s prometheus:9090/api/v1/query?query=error_rate | jq '.data.result[0].value[1]')
if (( $(echo "$ERROR_RATE > 0.01" | bc -l) )); then
echo "Error rate too high, rolling back..."
kubectl rollout undo deployment/ml-api
exit 1
fi
# 5. Promote to 100%
echo "Promoting to 100% traffic..."
kubectl set image deployment/ml-api \
ml-api=myregistry/ml-api:$MODEL_VERSION \
--canary-weight=100
echo "β Deployment complete"
'''
print("Deployment Script:")
print(deployment_script)
Best PracticesΒΆ
Version Everything
Code (Git)
Data (DVC, S3 versioning)
Models (MLflow, model registry)
Dependencies (requirements.txt with versions)
Automated Testing
Unit tests for code
Data validation tests
Model performance tests
Integration tests
Quality Gates
Minimum accuracy threshold
Maximum latency
Code coverage
Linting/formatting
Gradual Rollout
Start with canary (5-10%)
Monitor key metrics
Increase gradually
Auto-rollback on issues
Documentation
README with setup instructions
API documentation
Model cards
Runbooks for incidents
Key TakeawaysΒΆ
β Automate testing and deployment β Validate data and models automatically β Use quality gates to prevent bad deployments β Implement gradual rollout strategies β Version and document everything