In this article, I'll demonstrate a machine learning work flow based on the sklearn
library. The whole work flow resembles very much to the one based on spark. In fact, it is the sklearn
library that inspires the spark developers to make a pipeline-based framework. As of today, the pipeline from sklearn
is still far more versatile than Spark. I love the design. It's clear yet subtle, sophisticated but not hard to use.
Since the framework is somewhat similar to Spark, I will omit most of the trivial stuff, like importing package or loading data, and only focus on some special topics.
Also bear in mind that we only run on sample of the original data. sklearn
can still handle it if you dump in all 7 million data points, but will be sluggish. Let's dive directly in preprocessing step, in which we got several dates variables.
To convert strings to date time objects can be done either by the standard library, or by the pandas
. The real problem is speed. Even pd.to_datetime()
is not capable to handle large data sets. This is because under the hood the date time objects are treated like objects and cannot do a matrix manipulation. In other words, even pandas has to iterate every single element.
Here is a work-around to make it much faster: dictionary. The idea is simple: there are not so many distinct dates after all. In a dataset with a million rows, chances are most of dates are repetitive. Why do type conversion over and over again when a lookup is available? We simply find all the unique dates and convert and store them in a dictionary. Then do a lookup. To implement this idea, we only need to pass a dictionary to the a pd series, and pandas will handle the rest.
def lookup(s):
dates = {}
for date in s.unique():
dates[date] = pd.to_datetime(date)- datetime.datetime.now()
return s.map(dates)
We also regrouped the categorical data with more than 15 categories. The code is a little involved, but the logic as simple as retaining the first half of categories. By the first half, I mean the categories that accounts for 50% of all observations.
By the way, in terms of converting strings to numerals, I recommend using pd.get_dummies()
. The only catch is we got a dense vector instead of a sparse one. However, as I realized later, lots of algorithms in sklearn
does not benefit from sparse vector anyway.
def preprocessor(data_raw):
dates_list = ['PRCSS_DT', 'CNTRCT_EFFCTV_DT','PF_EFFCTV_DT']
for var in dates_list:
data_raw[var] = lookup(data_raw[var])
data_raw[var] = np.abs(data_raw[var].dt.days/365)
numerical_vars = list(data_raw.describe().columns)
categorical_vars = [var for var in data_raw.columns if var not in numerical_vars]
numerical_vars.remove("r_number")
TOO_MANY_CATS = 15
for var in categorical_vars:
column = data_raw[var]
if len(column.value_counts()) > TOO_MANY_CATS:
table = data_raw[var].value_counts().cumsum()
index_list = table[table < data_raw.shape[0]*0.5].index
print '%r how many categories left: %d' %(var, len(index_list))
data_raw.set_value(~data_raw[var].isin(index_list), var, "others")
data_v1 = pd.get_dummies(data_raw[(categorical_vars + numerical_vars)],
dummy_na= True)
return data_v1
training_set = preprocessor(training_set_comp)
This process usually takes several minutes. You do NOT want to do it over and over again. So pickle it! Pickle is like the check point in a video game, since you do not want to start over from the very beginning every time you die. So after an important and time consuming operation, pickle what you've done. You can save anything: a classifier object, a dictionary or whatever. Use the HIGHEST_PROTOCOL
for best performance, and the whole dump and load thing takes only less than 10 seconds! Here's how to do it.
import pickle
#to save
with open("after_regroup_data.pickle", 'wb') as handle:
pickle.dump(training_set, handle, pickle.HIGHEST_PROTOCOL)
# to load
with open("after_regroup_data.pickle", "rb") as handle:
training_set_from_pickle = pickle.load(handle)
Before the pipeline, let's take a look at the classifiers at hand. I put some classifiers I am gonna use in a dictionary.
param_dict_for_xgb = {'colsample_bytree': 0.80,
'gamma': 4.0504050666597102,
'learning_rate': 0.01,
'max_depth': 7,
'min_child_weight': 2,
'n_estimators': 35,
'reg_alpha': 9.61,
'subsample': 0.8}
classifier_pool = {"ada": AdaBoostClassifier(DecisionTreeClassifier(max_depth=2)),
"ridge": RidgeClassifier(),
"logistic": LogisticRegression(n_jobs = 100),
"xgb": XGBClassifier(**param_dict_for_xgb),
"rf": RandomForestClassifier( criterion='gini', n_jobs = 50, random_state= 2017),
"extra_tree": ExtraTreesClassifier(n_jobs = 100, max_features= 9, n_estimators= 300),
"knn": KNeighborsClassifier(n_jobs = 50)
}
Spoiler: I tested all of them on 10% of the data, the ada-boost algorithm gives the best result, big time. Note that the recommended way of using adaboost
is to first define a shallow decision tree classifier and use adaboost
as a wrapper. The idea of adaptive boosting is building a base classifiers iteratively based on previous classifying errors, in which the base the classifier is a decision tree.
The only catch is speed. Unlike random forest, these adaptive trees have to be trained sequentially most of the time. To put this into perspective, you need 3min 16s to finish a 10% sample to finish a loop. On the other hand, it only takes 33 seconds for a random forest.
Now here comes the pipeline. The following code demonstrate how to complete the whole thing, from imputation to classification. And the AUC is 0.554434187007 with random forest.
def fit_and_predict(train_x_input, train_y_input, test_x_input, test_y_input, classification_method, smote = True):
estimator = [ ("imputer", Imputer(missing_values='NaN', strategy= "median", axis= 0)),
('standardscaler', StandardScaler(with_mean=True, with_std=True)),
("reduce_dim", PCA(n_components = 3, random_state= 2017)),
("feature_selection", SelectFromModel(RandomForestClassifier(random_state= 2017,
n_jobs = 50), threshold = 0.001)),
("classification",classifier_pool[classification_method] )
]
pp = Pipeline(estimator)
pp.fit(train_x_input, train_y_input)
predict_ = pp.predict_proba(test_x_input)[:,1]
auc_ = auc(test_y_input, predict_, reorder= True)
print auc_
return {"prediction": predict_, "pipeline": pp}
opt2 = fit_and_predict(train_x, train_y, test_x, test_y, "rf")
We can make this pipeline even more versatile. For instance, we might not want a sequential manner in feature selection. In other words, we may want some variables from Principle component analysis and some variables from a linear model. To that end, use FeatureUnion
.
from sklearn.pipeline import FeatureUnion
from sklearn.feature_selection import SelectKBest
from sklearn.metrics import roc_curve
def fit_and_predict2(train_x_input, train_y_input, test_x_input, test_y_input, classification_method, smote = True):
estimator = [ ("preprocessing", Pipeline([
("imputer", Imputer(missing_values='NaN', strategy= "median", axis= 0)),
('standardscaler', StandardScaler(with_mean=True, with_std=True))
])),
("feature selection", FeatureUnion( [
# ('bestK', SelectKBest(k= 3)),
('pca', PCA(n_components= 20, random_state= 2017)),
("from_rf", SelectFromModel(ExtraTreesClassifier(n_jobs = 50, random_state=2017), threshold= "mean"))
])),
("classifier", Pipeline( [
("knn", KNeighborsClassifier(n_jobs = 150))
]))
]
pp = Pipeline(estimator)
pp.fit(train_x_input, train_y_input)
predict_ = pp.predict_proba(test_x_input)[:,1]
fpr, tpr, thresholds = roc_curve(test_y_input, predict_)
dd = auc(fpr, tpr)
print dd
return {"prediction": predict_, "pipeline": pp}
if __name__ == "__main__":
opt2 = fit_and_predict2(train_x, train_y, test_x, test_y, "rf")
Compare the two workflows: fit_and_predict
and fit_and_predict2
. They have the same major components. However, the first one employs a linear manner workflow with Pipeline
, one after another. For instance, it first does PCA dimension reduction, followed by a random-forest-based feature selection.
One way to verify this is to change max_features
to 4, and you will see a error message. This is because in PCA()
step, we only took 3 variables. And there will be no error message if you try the same thing on fit_and_predict2
because these two variable selection subroutines are not depending on each other because we used FeatureUnion
in this case. Feature union and pipeline behave exactly the same, with the only difference being either sequential or parallel. To recap, FeatureUnion
COMBINES the results of its subroutines but pipeline implement all the subroutines one after another.
Another nice thing of this framework is that you can always try turning off some of the subroutines and see how the result changes accordingly. For instance, you can comment out the PCA()
if you are not sure whether using it is a good idea. Or even better, you can turn off the whole module of feature selection and see what happens.
For the workflow above, if you are not sure about the parameter choices, this is how to do a grid search. It has got cross validation built in, and thus you only have to pass in the training set. You can also specify the metric to use. By the way, you need to do this ancient if __name__ == "__main__'
thing before the grid search, just to make sure multi-processing is working properly. With multi-processing I did 27 fits (3 x 3 x 3 fold) within in 1.7 minutes, which is dope.
estimator = [ ("preprocessing", Pipeline([
("imputer", Imputer(missing_values='NaN', strategy= "median", axis= 0)),
('standardscaler', StandardScaler(with_mean=True, with_std=True))
])),
("feature_selection", FeatureUnion( [
('bestK', SelectKBest(k= 3)),
('pca', PCA(n_components= 20, random_state= 2017)),
("from_rf", SelectFromModel(ExtraTreesClassifier(n_jobs = 50, random_state=2017), threshold= "mean"))
])),
("classifier", Pipeline( [
("rf", RandomForestClassifier( criterion='gini', n_jobs = 50, n_estimators= 20, random_state= 2017))
]))
]
pp = Pipeline(estimator)
parameters = {
'feature_selection__bestK__k': (3, 10, 20),
'feature_selection__pca__n_components': (3, 10, 20)
}
if __name__ == "__main__":
grid_search = GridSearchCV(pp, parameters, n_jobs= 50 , verbose=1,error_score="roc_auc")
grid_search.fit(train_x, train_y)
This is the output:
Fitting 3 folds for each of 9 candidates, totalling 27 fits
[Parallel(n_jobs=50)]: Done 12 out of 27 | elapsed: 1.4min remaining: 1.7min
[Parallel(n_jobs=50)]: Done 27 out of 27 | elapsed: 1.6min finished
[1] grid_search.best_params_
{'feature_selection__bestK__k': 3, 'feature_selection__pca__n_components': 20}
The following is a complete list of the library that I imported at the beginning of this project.
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import datetime
from sklearn.preprocessing import Imputer, StandardScaler
import os
from sklearn.model_selection import GridSearchCV, RandomizedSearchCV, train_test_split
from sklearn.feature_selection import SelectFromModel
from sklearn.ensemble import GradientBoostingClassifier, RandomForestClassifier, ExtraTreesClassifier, AdaBoostClassifier
from sklearn.neighbors import KNeighborsClassifier
from sklearn.linear_model import RidgeClassifier, LogisticRegression, LogisticRegressionCV
from sklearn.pipeline import Pipeline
from sklearn.decomposition import PCA
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report, roc_auc_score, auc
from xgboost.sklearn import XGBClassifier
from imblearn.over_sampling import SMOTE