ODH Logo

Step 3: Clustering + Model Exploration

In this notebook, dataset is visualized and clustered, and some classification models are explored.

First, dimensionality reduction is done using PCA and UMAP. This serves two purposes - to visualize the data, and to try to increase training speed and reduce overfitting.

Data reduced to 3 dimentions is visualized to get a better understanding of how it is distributed and if there are any intrinsic patterns. This helps in forming the classification strategy. The interactive plots are stored as html files in the img directory.

Since human-understandable visualization is limited to 3 dimensions, clustering methods are used to look for inherent patterns and clusterings. If well defined clusters are formed, then these methods could also be used for classification.

Finally, some models are implemented and tested. These include SVM, Adaboost SVM Ensemble, XGBoost, and RGF. The models may or may not be able to learn well from a dataset reduced using PCA or UMAP, but it's worth a shot.

The following models will be explored in future versions: 1. Neural network 2. Classifiers based on embeddings learned from semi-supervised UMAP

[1]
import os
import gc
import datetime
import cloudpickle

import scipy
import numpy as np

import seaborn as sns
import plotly

import dask.array as da
import dask.dataframe as dd
from dask.distributed import Client
from joblib import parallel_backend

from umap import UMAP
from sklearn.decomposition import PCA
from dask_ml.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split

from sklearn.neighbors import DistanceMetric
from sklearn.cluster import KMeans, SpectralClustering, DBSCAN

from sklearn.svm import SVC
from rgf.sklearn import RGFClassifier
from dask_ml.xgboost import XGBClassifier
from sklearn.ensemble import AdaBoostClassifier

from sklearn.metrics import (
    confusion_matrix,
    homogeneity_completeness_v_measure,
    precision_recall_fscore_support,
)

# Display all cell outputs
from IPython.core.interactiveshell import InteractiveShell

InteractiveShell.ast_node_interactivity = "all"
[3]
# start a local Dask client
# use the daskboard at http://localhost:8787/status
# NOTE: ProgressBar does not show up when client is initialized
client = Client()
/home/kachauha/.local/share/virtualenvs/ceph_drive_failure-3-2yJKyM/lib/python3.7/site-packages/distributed/bokeh/core.py:74: UserWarning: Port 8787 is already in use. Perhaps you already have a cluster running? Hosting the diagnostics dashboard on a random port instead.
[4]
# inferred int64 types cause a type mismatch (int vs float) error when dask sees a null value
# null values cannot be interpreted as ints
custom_dtypes = {
    "date": "object",
    "serial_number": "object",
    "capacity_bytes": "float32",
    "failure": "float32",
    "smart_1_normalized": "float32",
    "smart_1_raw": "float32",
    "smart_5_normalized": "float32",
    "smart_5_raw": "float32",
    "smart_7_normalized": "float32",
    "smart_7_raw": "float32",
    "smart_10_normalized": "float32",
    "smart_10_raw": "float32",
    "smart_184_normalized": "float32",
    "smart_184_raw": "float32",
    "smart_187_normalized": "float32",
    "smart_187_raw": "float32",
    "smart_188_normalized": "float32",
    "smart_188_raw": "float32",
    "smart_189_normalized": "float32",
    "smart_189_raw": "float32",
    "smart_190_normalized": "float32",
    "smart_190_raw": "float32",
    "smart_193_normalized": "float32",
    "smart_193_raw": "float32",
    "smart_194_normalized": "float32",
    "smart_194_raw": "float32",
    "smart_197_normalized": "float32",
    "smart_197_raw": "float32",
    "smart_198_normalized": "float32",
    "smart_198_raw": "float32",
    "smart_240_normalized": "float32",
    "smart_240_raw": "float32",
    "smart_241_normalized": "float32",
    "smart_241_raw": "float32",
    "smart_242_normalized": "float32",
    "smart_242_raw": "float32",
}
[5]
# read all the cleaned seagate data into one dataframe
DATA_DIR = "/home/kachauha/Downloads/data_Q4_2018_clean"
seagate_df = dd.read_csv(os.path.join(DATA_DIR, "*.csv"), dtype=custom_dtypes)
[6]
# get the serial numbers for all the failed hard drives
failed_serials = seagate_df[seagate_df["failure"] == 1]["serial_number"]

Collapse Time Dimension

Most of the classification models discussed cannot be directly applied to time series data, especially considering that the time series may be of different lengths. Therefore, time series are reduced to fixed-length feature vectors.

The simplest and easiest method of collapsing time dimension is implemented for now. See https://trello.com/c/kIoGk9T1 for a full list of representation techniques that could be explored.

[7]
# collapse time dim, drop cols that are unncessary for train data
seagate_df_grouped = seagate_df.drop(["date", "failure"], axis=1).groupby(
    "serial_number"
)

# characterize a time series by simple stats - mean,std,max,min,size,kurtosis
means = seagate_df_grouped.mean()
means = means.rename(columns={col: "mean_" + col for col in means.columns})

stds = seagate_df_grouped.std(ddof=0)
stds = stds.rename(columns={col: "std_" + col for col in stds.columns})
stds = stds.fillna(0)  # FIXME: std returns nans even for ddof=0

# maxs = seagate_df_grouped.max()
# maxs = maxs.rename(columns={col: 'max_' + col for col in maxs.columns})

# mins = seagate_df_grouped.min()
# mins = mins.rename(columns={col: 'min_' + col for col in mins.columns})

days = seagate_df_grouped.size().to_frame("days")
[8]
# join stats into one df
X_df = means.merge(right=stds, left_index=True, right_index=True)
# X_df = X_df.merge(right=maxs, left_index=True, right_index=True)
# X_df = X_df.merge(right=mins, left_index=True, right_index=True)
X_df = X_df.merge(right=days, left_index=True, right_index=True)
[9]
# convert to dask array - some algos dont like dataframes
X_arr = X_df.to_dask_array(lengths=True)
[ ]
# sanity check - ensure no nans
X_df.isna().any().any().compute()
[10]
# clean up memory
del means
del stds
del days
gc.collect()
1181
[11]
# get ground truth labels
# NOTE: dask-xgboost needs this to be dask obejct
# NOTE: if not converted to dask array, it is of nan shape
Y_arr = da.from_array(X_df.index.isin(failed_serials.values.compute()).compute())
Y_arr.shape
(82993,)
[12]
# true if drive had failed. used for indexing into Xtrain dask array
failed_idx = (Y_arr == 1).compute()
failed_idx.sum()
341

Preprocess

[13]
std_scaler = StandardScaler()
# rob_scaler = RobustScaler()

X_arr = std_scaler.fit_transform(X_arr)
# X = rob_scaler.fit_transform(X)

Save preprocessed

[20]
da.to_npy_stack("preprocessed/X_arr", X_arr)
da.to_npy_stack("preprocessed/Y_arr", Y_arr)

Visualize

[23]
# vanilla pca for visualization in 3d
with parallel_backend("dask"):
    pcs = PCA(n_components=3)
    X_pca = pcs.fit_transform(X_arr)
[57]
# plot
working_pca = dict(
    mode="markers",
    name="Working",
    type="scatter3d",
    x=X_pca[~failed_idx, 0],
    y=X_pca[~failed_idx, 1],
    z=X_pca[~failed_idx, 2],
    marker=dict(size=2, color="rgb(0, 0, 200)"),
)

failed_pca = dict(
    mode="markers",
    name="Failed",
    type="scatter3d",
    x=X_pca[failed_idx, 0],
    y=X_pca[failed_idx, 1],
    z=X_pca[failed_idx, 2],
    marker=dict(size=2, color="rgb(200, 0, 0)"),
)

layout_pca = dict(
    title="Simple Reduction Euclidean PCA",
    scene=dict(
        xaxis=dict(zeroline=False),
        yaxis=dict(zeroline=False),
        zaxis=dict(zeroline=False),
    ),
)
[56]
fig = dict(data=[working_pca, failed_pca], layout=layout_pca)
plotly.offline.plot(
    fig,
    filename="../../../reports/figures/pca/seagate_simple_euclidean.html",
    auto_open=True,
)
'seagate_pca_scatter.html'
[ ]
#################################################################
#   KERNEL PCA RUNS OUT OF MEMORY for rbf, sigmoid, cosine      #
#################################################################
# # euclidean is not good enough for the dims we have. go nonlinear
# with parallel_backend('dask'):
#     kpca = KernelPCA(n_components=3, kernel='cosine', random_state=42)
#     X_kpca = kpca.fit_transform(X_arr)
[ ]
# # plot
# working_kpca = dict(
#     mode = "markers",
#     name = "Working",
#     type = "scatter3d",
#     x = X_kpca[~failed_idx, 0],
#     y = X_kpca[~failed_idx, 1],
#     z = X_kpca[~failed_idx, 2],
#     marker = dict( size=2, color="rgb(0, 0, 200)" )
# )

# failed_kpca = dict(
#     mode = "markers",
#     name = "Failed",
#     type = "scatter3d",
#     x = X_kpca[failed_idx, 0],
#     y = X_kpca[failed_idx, 1],
#     z = X_kpca[failed_idx, 2],
#     marker = dict( size=2, color="rgb(200, 0, 0)" )
# )

# layout_kpca = dict(
#     title = 'Simple Reduction RBF PCA',
#     scene = dict(
#         xaxis = dict( zeroline=False ),
#         yaxis = dict( zeroline=False ),
#         zaxis = dict( zeroline=False ),
#     )
# )
[ ]
# fig = dict( data=[working_kpca, failed_kpca], layout=layout_kpca )
# plotly.offline.plot(fig, filename='seagate_kpca_rbf.html', auto_open=True)
[40]
# visualize using umap (tsne may be too expensive)
with parallel_backend("dask"):
    ucs = UMAP(n_components=3, metric="canberra", n_neighbors=32, random_state=42)
    X_umap = ucs.fit_transform(X_arr, y=da.where(Y_arr == 0, -1, Y_arr))
/home/kachauha/.local/share/virtualenvs/ceph_drive_failure-3-2yJKyM/lib/python3.7/site-packages/numba/compiler.py:588: NumbaPerformanceWarning: The keyword argument 'parallel=True' was specified but no transformation for parallel execution was possible. To find out why, try turning on parallel diagnostics, see http://numba.pydata.org/numba-doc/latest/user/parallel.html#diagnostics for help. File "../../.local/share/virtualenvs/ceph_drive_failure-3-2yJKyM/lib/python3.7/site-packages/umap/nndescent.py", line 47: @numba.njit(parallel=True) def nn_descent( ^ /home/kachauha/.local/share/virtualenvs/ceph_drive_failure-3-2yJKyM/lib/python3.7/site-packages/umap/umap_.py:569: NumbaWarning: Compilation is falling back to object mode WITH looplifting enabled because Function "categorical_simplicial_set_intersection" failed type inference due to: non-precise type pyobject [1] During: typing of argument at /home/kachauha/.local/share/virtualenvs/ceph_drive_failure-3-2yJKyM/lib/python3.7/site-packages/umap/umap_.py (599) File "../../.local/share/virtualenvs/ceph_drive_failure-3-2yJKyM/lib/python3.7/site-packages/umap/umap_.py", line 599: def categorical_simplicial_set_intersection( <source elided> """ simplicial_set = simplicial_set.tocoo() ^ /home/kachauha/.local/share/virtualenvs/ceph_drive_failure-3-2yJKyM/lib/python3.7/site-packages/numba/compiler.py:725: NumbaWarning: Function "categorical_simplicial_set_intersection" was compiled in object mode without forceobj=True. File "../../.local/share/virtualenvs/ceph_drive_failure-3-2yJKyM/lib/python3.7/site-packages/umap/umap_.py", line 570: @numba.jit() def categorical_simplicial_set_intersection( ^ /home/kachauha/.local/share/virtualenvs/ceph_drive_failure-3-2yJKyM/lib/python3.7/site-packages/numba/compiler.py:734: NumbaDeprecationWarning: Fall-back from the nopython compilation path to the object mode compilation path has been detected, this is deprecated behaviour. For more information visit http://numba.pydata.org/numba-doc/latest/reference/deprecation.html#deprecation-of-object-mode-fall-back-behaviour-when-using-jit File "../../.local/share/virtualenvs/ceph_drive_failure-3-2yJKyM/lib/python3.7/site-packages/umap/umap_.py", line 570: @numba.jit() def categorical_simplicial_set_intersection( ^ /home/kachauha/.local/share/virtualenvs/ceph_drive_failure-3-2yJKyM/lib/python3.7/site-packages/umap/umap_.py:541: NumbaWarning: Compilation is falling back to object mode WITH looplifting enabled because Function "reset_local_connectivity" failed type inference due to: Untyped global name 'normalize': cannot determine Numba type of <class 'function'> File "../../.local/share/virtualenvs/ceph_drive_failure-3-2yJKyM/lib/python3.7/site-packages/umap/umap_.py", line 560: def reset_local_connectivity(simplicial_set): <source elided> """ simplicial_set = normalize(simplicial_set, norm="max") ^ /home/kachauha/.local/share/virtualenvs/ceph_drive_failure-3-2yJKyM/lib/python3.7/site-packages/numba/compiler.py:725: NumbaWarning: Function "reset_local_connectivity" was compiled in object mode without forceobj=True. File "../../.local/share/virtualenvs/ceph_drive_failure-3-2yJKyM/lib/python3.7/site-packages/umap/umap_.py", line 542: @numba.jit() def reset_local_connectivity(simplicial_set): ^ /home/kachauha/.local/share/virtualenvs/ceph_drive_failure-3-2yJKyM/lib/python3.7/site-packages/numba/compiler.py:734: NumbaDeprecationWarning: Fall-back from the nopython compilation path to the object mode compilation path has been detected, this is deprecated behaviour. For more information visit http://numba.pydata.org/numba-doc/latest/reference/deprecation.html#deprecation-of-object-mode-fall-back-behaviour-when-using-jit File "../../.local/share/virtualenvs/ceph_drive_failure-3-2yJKyM/lib/python3.7/site-packages/umap/umap_.py", line 542: @numba.jit() def reset_local_connectivity(simplicial_set): ^ /home/kachauha/.local/share/virtualenvs/ceph_drive_failure-3-2yJKyM/lib/python3.7/site-packages/umap/spectral.py:229: UserWarning: Embedding a total of 3 separate connected components using meta-embedding (experimental)
[41]
# plot
working_umap = dict(
    mode="markers",
    name="Working",
    type="scatter3d",
    x=X_umap[~failed_idx, 0],
    y=X_umap[~failed_idx, 1],
    z=X_umap[~failed_idx, 2],
    marker=dict(size=2, color="rgb(0, 0, 200)"),
)

failed_umap = dict(
    mode="markers",
    name="Failed",
    type="scatter3d",
    x=X_umap[failed_idx, 0],
    y=X_umap[failed_idx, 1],
    z=X_umap[failed_idx, 2],
    marker=dict(size=2, color="rgb(200, 0, 0)"),
)

layout_umap = dict(
    title="Simple Reduction Canberra UMAP, Semisupervised",
    scene=dict(
        xaxis=dict(zeroline=False),
        yaxis=dict(zeroline=False),
        zaxis=dict(zeroline=False),
    ),
)
[42]
fig = dict(data=[working_umap, failed_umap], layout=layout_umap)
plotly.offline.plot(
    fig,
    filename="../../../reports/figures/umap/seagate_n32_canberra_ss.html",
    auto_open=True,
)
'seagate_umapn32_canberra_ss.html'

Reduce Data

Since UMAP seems to be creating useful embeddings, we can use the embeddings as the reduced dataset and classify on that instead. There is some description of this method in the original paper, and a lot of description and documentation in umap-learn

[156]
# keep 20 out of 67 dimensions
feat_reducer = UMAP(
    n_components=20, metric="chebyshev", n_neighbors=15, random_state=42
)
X_umap = feat_reducer.fit_transform(X_arr)
/home/kachauha/.local/share/virtualenvs/ceph_drive_failure-3-2yJKyM/lib/python3.7/site-packages/numba/compiler.py:588: NumbaPerformanceWarning: The keyword argument 'parallel=True' was specified but no transformation for parallel execution was possible. To find out why, try turning on parallel diagnostics, see http://numba.pydata.org/numba-doc/latest/user/parallel.html#diagnostics for help. File "../../.local/share/virtualenvs/ceph_drive_failure-3-2yJKyM/lib/python3.7/site-packages/umap/nndescent.py", line 47: @numba.njit(parallel=True) def nn_descent( ^ /home/kachauha/.local/share/virtualenvs/ceph_drive_failure-3-2yJKyM/lib/python3.7/site-packages/umap/spectral.py:229: UserWarning: Embedding a total of 3 separate connected components using meta-embedding (experimental)

Dask Model Saving Utility

Since models could be either dask models or sklearn models, using just pickle or joblib may not suffice because dask models contain delayed objects. Therefore, models are saved using cloudpickle, which has been tested to work (for trivial cases, at least).

[14]
def save_model(model, fname, suffix=None):
    """Serialize and save a dask or sklearn model

    Arguments:
        model {dask_ml or sklearn object} -- trained model to save
        fname {str} -- name of file to write to

    Keyword Arguments:
        suffix {str} -- suffix in the filename (default: {None})
                        This serves as the identifier of the current
                        state of the model
                        If not provided, the current timestamp is used
    """
    # generate default suffix if needed
    if suffix is None:
        suffix = datetime.datetime.now().strftime("%b_%d_%Y_%H_%M_%S")

    # serialize and write
    with open(fname + "_" + suffix + ".cpkl", "wb") as file:
        cloudpickle.dump(model, file)

Split Data for Models

Since there is a large class imbalance in the dataset, the splitting must be done such that the percentages of classes in the splits is the same as those in the overall data.

[24]
# split the original dataset
with parallel_backend("dask"):
    X_train, X_test, Y_train, Y_test = train_test_split(X_arr, Y_arr, stratify=Y_arr)
[ ]
# split the reduced dataset
with parallel_backend("dask"):
    X_train_umap, X_test_umap, Y_train_umap, Y_test_umap = train_test_split(
        X_umap, Y_arr, stratify=Y_arr
    )

Clusters

Distance Based

[93]
with parallel_backend("dask"):
    for k in range(2, 10):
        # find kmeans clusters for current k
        km = KMeans(n_clusters=k, random_state=42)
        km.fit(X_arr)

        # get metrics for current k
        h, c, v = homogeneity_completeness_v_measure(Y_train, km.labels_)
        print("========================================")
        print("k={}: H={}, C={}, V={}".format(k, h, c, v))
        print("conf mat:")
        print(confusion_matrix(Y_train, km.labels_))
======================================== k= 2: H=6.848515299160378e-05, C=1.9800378535589948e-05, V=3.071923405442396e-05 conf mat: [[81119 1533] [ 336 5]] ======================================== k= 3: H=0.00037707201658545404, C=0.00010120982184295491, V=0.0001595853681000266 conf mat: [[81118 1338 196] [ 336 5 0] [ 0 0 0]] ======================================== k= 4: H=0.0004120967325706981, C=0.00010087610130649623, V=0.00016207763451594342 conf mat: [[81118 908 148 478] [ 336 4 0 1] [ 0 0 0 0] [ 0 0 0 0]] ======================================== k= 5: H=0.0004702256468091204, C=0.00011183461948498203, V=0.0001806943690480539 conf mat: [[81089 906 149 478 30] [ 336 4 0 1 0] [ 0 0 0 0 0] [ 0 0 0 0 0] [ 0 0 0 0 0]] ======================================== k= 6: H=0.00044448513054675645, C=9.974303213248817e-05, V=0.00016292539673169366 conf mat: [[81089 624 130 326 454 29] [ 336 3 0 1 1 0] [ 0 0 0 0 0 0] [ 0 0 0 0 0 0] [ 0 0 0 0 0 0] [ 0 0 0 0 0 0]] ======================================== k= 7: H=0.00047757221853384613, C=0.00010564183528245437, V=0.00017301231105696234 conf mat: [[81089 684 69 272 125 384 29] [ 336 3 0 1 0 1 0] [ 0 0 0 0 0 0 0] [ 0 0 0 0 0 0 0] [ 0 0 0 0 0 0 0] [ 0 0 0 0 0 0 0] [ 0 0 0 0 0 0 0]] ======================================== k= 8: H=0.0008230870194975019, C=0.00017629472566943523, V=0.0002903913364560139 conf mat: [[81089 553 106 197 332 29 301 45] [ 336 2 0 0 1 0 2 0] [ 0 0 0 0 0 0 0 0] [ 0 0 0 0 0 0 0 0] [ 0 0 0 0 0 0 0 0] [ 0 0 0 0 0 0 0 0] [ 0 0 0 0 0 0 0 0] [ 0 0 0 0 0 0 0 0]] ======================================== k= 9: H=0.0009658610061478699, C=0.0002026164729450131, V=0.0003349646936674023 conf mat: [[81089 329 94 176 29 275 500 127 33] [ 336 1 0 0 0 1 3 0 0] [ 0 0 0 0 0 0 0 0 0] [ 0 0 0 0 0 0 0 0 0] [ 0 0 0 0 0 0 0 0 0] [ 0 0 0 0 0 0 0 0 0] [ 0 0 0 0 0 0 0 0 0] [ 0 0 0 0 0 0 0 0 0] [ 0 0 0 0 0 0 0 0 0]]
[16]
with parallel_backend("dask"):
    for k in range(2, 6):
        # find kmeans clusters for current k
        sc = SpectralClustering(n_clusters=k, random_state=42)
        sc.fit(X_arr)

        # get metrics for current k
        h, c, v = homogeneity_completeness_v_measure(Y_train, sc.labels_)
        print("========================================")
        print("k={}: H={}, C={}, V={}".format(k, h, c, v))
        print("conf mat:")
        print(confusion_matrix(Y_train, sc.labels_))
--------------------------------------------------------------------------- MemoryError Traceback (most recent call last) <ipython-input-16-7411fa244531> in <module> 3 # find kmeans clusters for current k 4 sc = SpectralClustering(n_clusters=k, random_state=42) ----> 5 sc.fit(X_train) 6 7 # get metrics for current k ~/.local/share/virtualenvs/ceph_drive_failure-3-2yJKyM/lib/python3.7/site-packages/sklearn/cluster/spectral.py in fit(self, X, y) 482 self.affinity_matrix_ = pairwise_kernels(X, metric=self.affinity, 483 filter_params=True, --> 484 **params) 485 486 random_state = check_random_state(self.random_state) ~/.local/share/virtualenvs/ceph_drive_failure-3-2yJKyM/lib/python3.7/site-packages/sklearn/metrics/pairwise.py in pairwise_kernels(X, Y, metric, filter_params, n_jobs, **kwds) 1743 raise ValueError("Unknown kernel %r" % metric) 1744 -> 1745 return _parallel_pairwise(X, Y, func, n_jobs, **kwds) ~/.local/share/virtualenvs/ceph_drive_failure-3-2yJKyM/lib/python3.7/site-packages/sklearn/metrics/pairwise.py in _parallel_pairwise(X, Y, func, n_jobs, **kwds) 1208 # enforce a threading backend to prevent data communication overhead 1209 fd = delayed(_dist_wrapper) -> 1210 ret = np.empty((X.shape[0], Y.shape[0]), dtype=dtype, order='F') 1211 Parallel(backend="threading", n_jobs=n_jobs)( 1212 fd(func, ret, s, X, Y[s], **kwds) MemoryError:

Density Based

To use density based clkustering, we must find a good metric and its "threshold" value for points belonging in a cluster. This requires tuning.

[25]
man = DistanceMetric.get_metric("manhattan")
X_df.index.head()
Index(['6VDHZ9H9', '6VDHZAKY', '6VDHZATY', '6VDHZAXD', '6VDHZB48'], dtype='object', name='serial_number')
[36]
foo = X_df.loc["6VDHZAXD", :].compute()
bar = X_df.loc["6VDHZATY", :].compute()

scipy.spatial.distance.cityblock(foo, bar)
38.522346242013505
[39]
with parallel_backend("dask"):
    for epsilon in range(1, 6):
        # find kmeans clusters for current k
        db = DBSCAN(metric="manhattan", eps=epsilon)
        db.fit(X_arr)

        # get metrics for current k
        h, c, v = homogeneity_completeness_v_measure(Y_train, db.labels_)
        print("========================================")
        print("e={}: H={}, C={}, V={}".format(epsilon, h, c, v))
        print("conf mat:")
        print(confusion_matrix(Y_train, db.labels_))
======================================== e=1: H=0.0040969275756273325, C=0.0005331403538892181, V=0.0009435012404910959 conf mat: [[ 0 0 0 ... 0 0 0] [80478 5 11 ... 5 6 4] [ 341 0 0 ... 0 0 0] ... [ 0 0 0 ... 0 0 0] [ 0 0 0 ... 0 0 0] [ 0 0 0 ... 0 0 0]] ======================================== e=2: H=0.1669530204593021, C=0.0017809033841935226, V=0.00352421365383619 conf mat: [[ 0 0 0 ... 0 0 0] [26300 13254 119 ... 5 3 3] [ 337 1 0 ... 0 0 0] ... [ 0 0 0 ... 0 0 0] [ 0 0 0 ... 0 0 0] [ 0 0 0 ... 0 0 0]] ======================================== e=3: H=0.32776595457007096, C=0.004098996279768483, V=0.008096735885950135 conf mat: [[ 0 0 0 ... 0 0 0] [ 8132 16991 21 ... 21 5 5] [ 331 4 0 ... 0 0 0] ... [ 0 0 0 ... 0 0 0] [ 0 0 0 ... 0 0 0] [ 0 0 0 ... 0 0 0]] ======================================== e=4: H=0.4098237829785913, C=0.006056014861894233, V=0.011935655126138254 conf mat: [[ 0 0 0 ... 0 0 0] [ 3825 10 21214 ... 8 1199 32] [ 321 0 5 ... 0 0 0] ... [ 0 0 0 ... 0 0 0] [ 0 0 0 ... 0 0 0] [ 0 0 0 ... 0 0 0]] ======================================== e=5: H=0.43639244244612746, C=0.00710538602028686, V=0.013983097823219073 conf mat: [[ 0 0 0 ... 0 0 0] [ 2300 11 22560 ... 12 12 1199] [ 305 0 10 ... 0 0 0] ... [ 0 0 0 ... 0 0 0] [ 0 0 0 ... 0 0 0] [ 0 0 0 ... 0 0 0]]

NOTE

All the following models except svm have been fit on the entire dataset. They all do poorly (cant overfit).

XGBoost (vanilla + UMAP reduced)

[30]
# vanilla classifier
xgbc = XGBClassifier(max_depth=6, random_state=42)
xgbc.fit(X_train, Y_train)
XGBClassifier(base_score=0.5, booster='gbtree', colsample_bylevel=1,
              colsample_bynode=1, colsample_bytree=1, gamma=0,
              learning_rate=0.1, max_delta_step=0, max_depth=6,
              min_child_weight=1, missing=None, n_estimators=100, n_jobs=1,
              nthread=None, objective='binary:logistic', random_state=42,
              reg_alpha=0, reg_lambda=1, scale_pos_weight=1, seed=None,
              silent=None, subsample=1, verbosity=1)
[32]
preds = xgbc.predict(X_train)
confusion_matrix(Y_train.compute(), preds.compute())
array([[    0, 82652],
       [    0,   341]])
[33]
# reduced data classifier
xgbc_umap = XGBClassifier(max_depth=6, random_state=42)
# xgbc_umap = XGBClassifier(learning_rate=0.1,
#                                max_depth=3,
#                                min_child_weight=0.5,
#                                reg_lambda=0.01,
#                                reg_alpha=0.01,
#                                random_state=42)
xgbc_umap.fit(da.from_array(X_train_umap), Y_train)
XGBClassifier(base_score=0.5, booster='gbtree', colsample_bylevel=1,
              colsample_bynode=1, colsample_bytree=1, gamma=0,
              learning_rate=0.1, max_delta_step=0, max_depth=6,
              min_child_weight=1, missing=None, n_estimators=100, n_jobs=1,
              nthread=None, objective='binary:logistic', random_state=42,
              reg_alpha=0, reg_lambda=1, scale_pos_weight=1, seed=None,
              silent=None, subsample=1, verbosity=1)
[34]
preds = xgbc_umap.predict(da.from_array(X_train_umap))
confusion_matrix(Y_train.compute(), preds.compute())
array([[    0, 82652],
       [    0,   341]])
[35]
gc.collect()
2649

SVM

[76]
with parallel_backend("dask"):
    svc = SVC(C=5, class_weight="balanced", probability=True, random_state=42)
    svc.fit(X_train, Y_train)
/home/kachauha/.local/share/virtualenvs/ceph_drive_failure-3-2yJKyM/lib/python3.7/site-packages/sklearn/svm/base.py:193: FutureWarning: The default value of gamma will change from 'auto' to 'scale' in version 0.22 to account better for unscaled features. Set gamma explicitly to 'auto' or 'scale' to avoid this warning.
SVC(C=5, cache_size=200, class_weight='balanced', coef0=0.0,
    decision_function_shape='ovr', degree=3, gamma='auto_deprecated',
    kernel='rbf', max_iter=-1, probability=True, random_state=42,
    shrinking=True, tol=0.001, verbose=False)
[77]
with parallel_backend("dask"):
    prob_preds_train = svc.predict_proba(X_train)
    prob_preds_test = svc.predict_proba(X_test)
[78]
# threshold probability for model train on all data = ~0.3
TRAIN_THRESHOLD_PROBA = 0.5

# how well did it fit on the train data
confmat_train = confusion_matrix(
    Y_train, prob_preds_train[:, 1] > TRAIN_THRESHOLD_PROBA
)
sns.heatmap(
    confmat_train / confmat_train.sum(axis=1, keepdims=True),
    vmin=0,
    vmax=1,
    annot=True,
    cmap="Blues",
)
<matplotlib.axes._subplots.AxesSubplot at 0x7f0847eeddd8>
[84]
# how well does it generalize
TEST_THRESHOLD_PROBA = 0.005
confmat_test = confusion_matrix(Y_test, prob_preds_test[:, 1] > TEST_THRESHOLD_PROBA)
sns.heatmap(
    confmat_test / confmat_test.sum(axis=1, keepdims=True),
    vmin=0,
    vmax=1,
    annot=True,
    cmap="Blues",
)
<matplotlib.axes._subplots.AxesSubplot at 0x7f0853382fd0>
[149]
# grid search for best threshold proba
for prob in np.arange(0.33, 0.36, 0.005):
    curr_preds = prob_preds_test[:, 1] > prob
    p, r, f, _ = precision_recall_fscore_support(
        y_true=Y_test, y_pred=curr_preds, average="binary"
    )
    print("p={:2f}, prec={:2f}, rec={:2f}, f={:2f}".format(prob, p, r, f))
p=0.330000, prec=0.656716, rec=0.517647, f=0.578947 p=0.335000, prec=0.676923, rec=0.517647, f=0.586667 p=0.340000, prec=0.671875, rec=0.505882, f=0.577181 p=0.345000, prec=0.671875, rec=0.505882, f=0.577181 p=0.350000, prec=0.666667, rec=0.494118, f=0.567568 p=0.355000, prec=0.672131, rec=0.482353, f=0.561644

SVM Ensemble

[ ]
with parallel_backend("dask"):
    # init adaboosted ensemble of svm classifiers
    ada_base_learner = SVC(
        C=2.5, class_weight="balanced", probability=True, random_state=42
    )
    ada_clf = AdaBoostClassifier(
        base_estimator=ada_base_learner,
        n_estimators=100,
        learning_rate=0.5,
        random_state=42,
    )
    # training time
    ada_clf.fit(X_train, Y_train)
[ ]
# how well did it do
with parallel_backend("dask"):
    ada_probs_train = ada_clf.predict_proba(X_train)
    ada_probs_test = ada_clf.predict_proba(X_test)
[ ]
TRAIN_THRESHOLD_PROBA = 0.5
ada_confmat_train = confusion_matrix(
    Y_train, ada_probs_train[:, 1] > TRAIN_THRESHOLD_PROBA
)
sns.heatmap(
    ada_confmat_train / ada_confmat_train.sum(axis=1, keepdims=True),
    vmin=0,
    vmax=1,
    annot=True,
    cmap="Blues",
)
[ ]
TEST_THRESHOLD_PROBA = 0.005
ada_confmat_test = confusion_matrix(Y_test, ada_probs_test[:, 1] > TEST_THRESHOLD_PROBA)
sns.heatmap(
    ada_confmat_test / ada_confmat_test.sum(axis=1, keepdims=True),
    vmin=0,
    vmax=1,
    annot=True,
    cmap="Blues",
)

RGF

[25]
rgfc = RGFClassifier(max_leaf=5000, l2=1e-10, learning_rate=0.01)
rgfc.fit(X_train, Y_train)
--------------------------------------------------------------------------- KeyboardInterrupt Traceback (most recent call last) <ipython-input-25-5cce4edb81a6> in <module> 1 rgfc = RGFClassifier(max_leaf=5000, l2=1e-10, learning_rate=0.01) ----> 2 rgfc.fit(X_train, Y_train) ~/.local/share/virtualenvs/ceph_drive_failure-3-2yJKyM/lib/python3.7/site-packages/rgf/utils.py in fit(self, X, y, sample_weight) 533 self._validate_params(**self.get_params()) 534 --> 535 X, y = check_X_y(X, y, accept_sparse=True) 536 if sp.isspmatrix(X): 537 self._is_sparse_train_X = True ~/.local/share/virtualenvs/ceph_drive_failure-3-2yJKyM/lib/python3.7/site-packages/sklearn/utils/validation.py in check_X_y(X, y, accept_sparse, accept_large_sparse, dtype, order, copy, force_all_finite, ensure_2d, allow_nd, multi_output, ensure_min_samples, ensure_min_features, y_numeric, warn_on_dtype, estimator) 717 ensure_min_features=ensure_min_features, 718 warn_on_dtype=warn_on_dtype, --> 719 estimator=estimator) 720 if multi_output: 721 y = check_array(y, 'csr', force_all_finite=True, ensure_2d=False, ~/.local/share/virtualenvs/ceph_drive_failure-3-2yJKyM/lib/python3.7/site-packages/sklearn/utils/validation.py in check_array(array, accept_sparse, accept_large_sparse, dtype, order, copy, force_all_finite, ensure_2d, allow_nd, ensure_min_samples, ensure_min_features, warn_on_dtype, estimator) 494 try: 495 warnings.simplefilter('error', ComplexWarning) --> 496 array = np.asarray(array, dtype=dtype, order=order) 497 except ComplexWarning: 498 raise ValueError("Complex data not supported\n" ~/.local/share/virtualenvs/ceph_drive_failure-3-2yJKyM/lib/python3.7/site-packages/numpy/core/numeric.py in asarray(a, dtype, order) 536 537 """ --> 538 return array(a, dtype, copy=False, order=order) 539 540 ~/.local/share/virtualenvs/ceph_drive_failure-3-2yJKyM/lib/python3.7/site-packages/dask/array/core.py in __array__(self, dtype, **kwargs) 1199 1200 def __array__(self, dtype=None, **kwargs): -> 1201 x = self.compute() 1202 if dtype and x.dtype != dtype: 1203 x = x.astype(dtype) ~/.local/share/virtualenvs/ceph_drive_failure-3-2yJKyM/lib/python3.7/site-packages/dask/base.py in compute(self, **kwargs) 173 dask.base.compute 174 """ --> 175 (result,) = compute(self, traverse=False, **kwargs) 176 return result 177 ~/.local/share/virtualenvs/ceph_drive_failure-3-2yJKyM/lib/python3.7/site-packages/dask/base.py in compute(*args, **kwargs) 444 keys = [x.__dask_keys__() for x in collections] 445 postcomputes = [x.__dask_postcompute__() for x in collections] --> 446 results = schedule(dsk, keys, **kwargs) 447 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)]) 448 ~/.local/share/virtualenvs/ceph_drive_failure-3-2yJKyM/lib/python3.7/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs) 2566 should_rejoin = False 2567 try: -> 2568 results = self.gather(packed, asynchronous=asynchronous, direct=direct) 2569 finally: 2570 for f in futures.values(): ~/.local/share/virtualenvs/ceph_drive_failure-3-2yJKyM/lib/python3.7/site-packages/distributed/client.py in gather(self, futures, errors, maxsize, direct, asynchronous) 1820 direct=direct, 1821 local_worker=local_worker, -> 1822 asynchronous=asynchronous, 1823 ) 1824 ~/.local/share/virtualenvs/ceph_drive_failure-3-2yJKyM/lib/python3.7/site-packages/distributed/client.py in sync(self, func, *args, **kwargs) 751 return future 752 else: --> 753 return sync(self.loop, func, *args, **kwargs) 754 755 def __repr__(self): ~/.local/share/virtualenvs/ceph_drive_failure-3-2yJKyM/lib/python3.7/site-packages/distributed/utils.py in sync(loop, func, *args, **kwargs) 327 else: 328 while not e.is_set(): --> 329 e.wait(10) 330 if error[0]: 331 six.reraise(*error[0]) /usr/lib64/python3.7/threading.py in wait(self, timeout) 550 signaled = self._flag 551 if not signaled: --> 552 signaled = self._cond.wait(timeout) 553 return signaled 554 /usr/lib64/python3.7/threading.py in wait(self, timeout) 298 else: 299 if timeout > 0: --> 300 gotit = waiter.acquire(True, timeout) 301 else: 302 gotit = waiter.acquire(False) KeyboardInterrupt:
[27]
%%time
with parallel_backend("dask"):
    rgfc = RGFClassifier(max_leaf=5000, l2=1e-10, learning_rate=0.01)
    rgfc.fit(X_train, Y_train)
CPU times: user 17.7 s, sys: 2.03 s, total: 19.8 s Wall time: 6min 59s
[ ]
with parallel_backend("dask"):
    rgf_prob_preds_train = rgfc.predict_proba(X_train)
    rgf_prob_preds_test = rgfc.predict_proba(X_test)
[ ]
# threshold probability for model test on all data = ~0.3
TEST_THRESHOLD_PROBA = 0.5

# how well did it fit on the test data
rgf_confmat_test = confusion_matrix(
    Y_test, rgf_prob_preds_test[:, 1] > TEST_THRESHOLD_PROBA
)
sns.heatmap(
    rgf_confmat_test / rgf_confmat_test.sum(axis=1, keepdims=True),
    vmin=0,
    vmax=1,
    annot=True,
    cmap="Blues",
)

Hyperparameter Tuning for RGF

The hyperparameter tuning is done using MLflow and OpenShift (see repos experiment-tracking and training). After tuning the hyperparameters, the RGF model performs very well on the dataset.

We conclude that RGF is the best model thus far.

The best parameters are as follows: 1. l2 = 1e10 2. learning_rate = 0.1 3. loss = "Log" 4. max_leaf = 5000 5. others = default

and the corresponding performance metrics on the test dataset (size: (0.2 x 82993, 67)) are as follows: 1. log_loss = 0.003 2. precision = 0.985 3. recall = 0.941 4. f1_score = 0.962

[ ]