Amazon SageMaker LightGBM Distributed training using Dask


This notebook’s CI test result for us-west-2 is as follows. CI test results in other regions can be found at the end of the notebook.

This us-west-2 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable


1. Set Up

[ ]:
!pip install sagemaker ipywidgets --upgrade
[ ]:
import sagemaker, boto3, json
from sagemaker import get_execution_role

aws_role = get_execution_role()
aws_region = boto3.Session().region_name
sess = sagemaker.Session()

bucket = sess.default_bucket()
prefix = "sagemaker/DEMO-churn-dt"
[ ]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import io
import os
import sys
import time
import json
from IPython.display import display
from time import strftime, gmtime
from sagemaker.inputs import TrainingInput
from sagemaker.serializers import CSVSerializer
from sklearn import preprocessing

2. Data Preparation and Visualization

Mobile operators have historical records on which customers ultimately ended up churning and which continued using the service. We can use this historical information to construct an ML model of one mobile operator’s churn using a process called training. After training the model, we can pass the profile information of an arbitrary customer (the same profile information that we used to train the model) to the model, and have the model predict whether this customer is going to churn. Of course, we expect the model to make mistakes. After all, predicting the future is tricky business! But we’ll learn how to deal with prediction errors.

The dataset we use is publicly available and was mentioned in the book Discovering Knowledge in Data by Daniel T. Larose. It is attributed by the author to the University of California Irvine Repository of Machine Learning Datasets. Let’s download and read that dataset in now:

[ ]:
s3 = boto3.client("s3")
s3.download_file(
    f"sagemaker-example-files-prod-{aws_region}",
    "datasets/tabular/synthetic/churn.txt",
    "churn.txt",
)
[ ]:
churn = pd.read_csv("./churn.txt")
pd.set_option("display.max_columns", 500)
churn.head(5)

By modern standards, it’s a relatively small dataset, with only 5,000 records, where each record uses 21 attributes to describe the profile of a customer of an unknown US mobile operator. The attributes are:

State: the US state in which the customer resides, indicated by a two-letter abbreviation; for example, OH or NJ

Account Length: the number of days that this account has been active

Area Code: the three-digit area code of the corresponding customer’s phone number

Phone: the remaining seven-digit phone number

Int’l Plan: whether the customer has an international calling plan: yes/no

VMail Plan: whether the customer has a voice mail feature: yes/no

VMail Message: the average number of voice mail messages per month

Day Mins: the total number of calling minutes used during the day

Day Calls: the total number of calls placed during the day

Day Charge: the billed cost of daytime calls

Eve Mins, Eve Calls, Eve Charge: the billed cost for calls placed during the evening

Night Mins, Night Calls, Night Charge: the billed cost for calls placed during nighttime

Intl Mins, Intl Calls, Intl Charge: the billed cost for international calls

CustServ Calls: the number of calls placed to Customer Service

Churn?: whether the customer left the service: true/false

The last attribute, Churn?, is known as the target attribute: the attribute that we want the ML model to predict. Because the target attribute is binary, our model will be performing binary prediction, also known as binary classification.

Let’s begin exploring the data:

[ ]:
# Histograms for each numeric features
display(churn.describe())
%matplotlib inline
hist = churn.hist(bins=30, sharey=True, figsize=(10, 10))

We can see immediately that: - State appears to be quite evenly distributed. - Phone takes on too many unique values to be of any practical use. It’s possible that parsing out the prefix could have some value, but without more context on how these are allocated, we should avoid using it. - Most of the numeric features are surprisingly nicely distributed, with many showing bell-like gaussianity. VMail Message is a notable exception.

[ ]:
churn = churn.drop("Phone", axis=1)
churn["Area Code"] = churn["Area Code"].astype(object)

Next let’s look at the relationship between each of the features and our target variable.

[ ]:
for column in churn.select_dtypes(include=["object"]).columns:
    if column != "Churn?":
        display(pd.crosstab(index=churn[column], columns=churn["Churn?"], normalize="columns"))

for column in churn.select_dtypes(exclude=["object"]).columns:
    print(column)
    hist = churn[[column, "Churn?"]].hist(by="Churn?", bins=30)
    plt.show()

We convert the target attribute to binary value and move it to the first column of the dataset to meet requirements of SageMaker built-in tabular algorithms (For an example, see SageMaker LightGBM documentation).

[ ]:
churn["target"] = churn["Churn?"].map({"True.": 1, "False.": 0})
churn.drop(["Churn?"], axis=1, inplace=True)
[ ]:
churn = churn[["target"] + churn.columns.tolist()[:-1]]

We identify the column indexes of the categorical attribute, which is required by LightGBM, CatBoost, and TabTransformer algorithm (AutoGluon-Tabular has built-in feature engineering to identify the categorical attribute automatically, and thus does not require such input).

[ ]:
cat_columns = [
    "State",
    "Account Length",
    "Area Code",
    "Phone",
    "Int'l Plan",
    "VMail Plan",
    "VMail Message",
    "Day Calls",
    "Eve Calls",
    "Night Calls",
    "Intl Calls",
    "CustServ Calls",
]

cat_idx = []
for idx, col_name in enumerate(churn.columns.tolist()):
    if col_name in cat_columns:
        cat_idx.append(idx)
[ ]:
with open("cat_idx.json", "w") as outfile:
    json.dump({"cat_idx": cat_idx}, outfile)

LightGBM official documentation requires that all categorical features should be encoded as non-negative integers.

[ ]:
for idx, col_name in enumerate(churn.columns.tolist()):
    if col_name in cat_columns:
        le = preprocessing.LabelEncoder()
        churn[col_name] = le.fit_transform(churn[col_name])

We split the churn dataset into train, validation, and test set using stratified sampling. Validation set is used for early stopping and AMT. Test set is used for performance evaluations in the end. Next, we upload them into a S3 path for training.

The structure of the S3 path for training should be structured as below.

  • The supported input data format for training is csv. You are allowed to put more than 1 data file under both train and valdiation channel. The name of data file can be any one as long as it ends with .csv.

  • The first column corresponds to the target and the rest of columns correspond to features. This follows the convention of SageMaker XGBoost algorithm.

  • The cat_idx.json is categorical column indexes. It contains a dictionary of a key-value pair. The key can be any string. The value is the list of column indexes of categorical features. The index starts with value 1 as value 0 corresponds to the target variable. Please see example above to format the cat_idx.json.

  • For the validation data, we encourage you to include one data file under its channel such that the all of the validation data points can be assigned to one machine. Thus, the validation score is for all of the validation data points and can be easily parsed by the AMT for hyperparameter optimization.

  • Current distributed training only supports CPU.

train      – data_1.csv      – data_2.csv      – data_3.csv      – cat_idx.json

validation      – data.csv

[ ]:
from sklearn.model_selection import train_test_split

train, val_n_test = train_test_split(
    churn, test_size=0.3, random_state=42, stratify=churn["target"]
)
[ ]:
val, test = train_test_split(
    val_n_test, test_size=0.3, random_state=42, stratify=val_n_test["target"]
)
[ ]:
train.to_csv("train.csv", header=False, index=False)
val.to_csv("validation.csv", header=False, index=False)
test.to_csv("test.csv", header=False, index=False)

For demonstartion purpose on including multiple files under the training channel, we simply duplicate the training data multiple times as shown below.

[ ]:
from tqdm import tqdm

for i in tqdm(range(200)):
    boto3.Session().resource("s3").Bucket(bucket).Object(
        os.path.join(prefix, f"train/data_{i}.csv")
    ).upload_file("train.csv")
[ ]:
boto3.Session().resource("s3").Bucket(bucket).Object(
    os.path.join(prefix, "validation/data.csv")
).upload_file("validation.csv")
[ ]:
boto3.Session().resource("s3").Bucket(bucket).Object(
    os.path.join(prefix, "test/data.csv")
).upload_file("test.csv")
[ ]:
boto3.Session().resource("s3").Bucket(bucket).Object(
    os.path.join(prefix, "train/cat_idx.json")
).upload_file("cat_idx.json")

3. Distributedly Train A SageMaker LightGBM Model with AMT

3.1. Retrieve Training Artifacts


Here, we retrieve the training docker container, the training algorithm source, and the tabular algorithm. Note that model_version=”*” fetches the latest model.

For the training algorithm, we have four choices in this demonstration for classification task. * LightGBM: To use this algorithm, specify train_model_id as lightgbm-classification-model in the cell below.

For regression task, the train_model_id is lightgbm-regression-model.


[ ]:
from sagemaker import image_uris, model_uris, script_uris

train_model_id, train_model_version, train_scope = "lightgbm-classification-model", "*", "training"
training_instance_type = "ml.m5.4xlarge"

# Retrieve the docker image
train_image_uri = image_uris.retrieve(
    region=None,
    framework=None,
    model_id=train_model_id,
    model_version=train_model_version,
    image_scope=train_scope,
    instance_type=training_instance_type,
)
# Retrieve the training script
train_source_uri = script_uris.retrieve(
    model_id=train_model_id, model_version=train_model_version, script_scope=train_scope
)
# Retrieve the pre-trained model tarball to further fine-tune
train_model_uri = model_uris.retrieve(
    model_id=train_model_id, model_version=train_model_version, model_scope=train_scope
)

3.2. Set Training Parameters


Now that we are done with all the setup that is needed, we are ready to train our tabular algorithm. To begin, let us create a `sageMaker.estimator.Estimator <https://sagemaker.readthedocs.io/en/stable/api/training/estimators.html>`__ object. This estimator will launch the training job.

There are two kinds of parameters that need to be set for training. The first one are the parameters for the training job. These include: (i) Training data path. This is S3 folder in which the input data is stored, (ii) Output path: This the s3 folder in which the training output is stored. (iii) Training instance type: This indicates the type of machine on which to run the training.

The second set of parameters are algorithm specific training hyper-parameters.


[ ]:
training_dataset_s3_path = f"s3://{bucket}/{prefix}/train"
validation_dataset_s3_path = f"s3://{bucket}/{prefix}/validation"

output_prefix = "jumpstart-example-tabular-training"
s3_output_location = f"s3://{bucket}/{output_prefix}/output_lgb"
[ ]:
from sagemaker import hyperparameters

# Retrieve the default hyper-parameters for fine-tuning the model
hyperparameters = hyperparameters.retrieve_default(
    model_id=train_model_id, model_version=train_model_version
)

# [Optional] Override default hyperparameters with custom values
hyperparameters["num_boost_round"] = "200"


hyperparameters["metric"] = "auc"
hyperparameters["tree_learner"] = "voting"  # use AllReduce method for distributed training

del hyperparameters[
    "early_stopping_rounds"
]  # current distributed training with early stopping has some issues. See https://github.com/microsoft/SynapseML/issues/728#issuecomment-1221599961
# thus it is disabled for distributed training.
print(hyperparameters)

3.3. Train with Automatic Model Tuning

Amazon SageMaker automatic model tuning, also known as hyperparameter tuning, finds the best version of a model by running many training jobs on your dataset using the algorithm and ranges of hyperparameters that you specify. It then chooses the hyperparameter values that result in a model that performs the best, as measured by a metric that you choose. We will use a HyperparameterTuner object to interact with Amazon SageMaker hyperparameter tuning APIs.

  • Note. In this notebook, we set AMT budget (total tuning jobs) as 10 for each of the tabular algorithm except AutoGluon-Tabular. For AutoGluon-Tabular, it succeeds by ensembling multiple models and stacking them in multiple layers.

[ ]:
from sagemaker.tuner import ContinuousParameter, IntegerParameter, HyperparameterTuner

use_amt = True

hyperparameter_ranges_lgb = {
    "learning_rate": ContinuousParameter(1e-4, 1, scaling_type="Logarithmic"),
    "num_boost_round": IntegerParameter(2, 30),
    "num_leaves": IntegerParameter(10, 50),
    "feature_fraction": ContinuousParameter(0.1, 1),
    "bagging_fraction": ContinuousParameter(0.1, 1),
    "bagging_freq": IntegerParameter(1, 10),
    "max_depth": IntegerParameter(5, 30),
    "min_data_in_leaf": IntegerParameter(5, 50),
}

3.4. Start Training

[ ]:
from sagemaker.estimator import Estimator
import random

training_job_name = "jumpstart-distr-lgb-g" + str(int(time.time()))

# Create SageMaker Estimator instance
tabular_estimator = Estimator(
    role=aws_role,
    image_uri=train_image_uri,
    source_dir=train_source_uri,
    model_uri=train_model_uri,
    entry_point="transfer_learning.py",
    instance_count=4,  ### select the instance count you would like to use for distributed training
    volume_size=30,  ### volume_size (int or PipelineVariable): Size in GB of the storage volume to use for storing input and output data during training (default: 30).
    instance_type=training_instance_type,
    max_run=360000,
    hyperparameters=hyperparameters,
    output_path=s3_output_location,
)

if use_amt:
    tuner = HyperparameterTuner(
        tabular_estimator,
        "auc",
        hyperparameter_ranges_lgb,
        [{"Name": "auc", "Regex": "auc: ([0-9\\.]+)"}],
        max_jobs=21,
        max_parallel_jobs=3,
        objective_type="Maximize",
    )

    tuner.fit(
        {
            "train": training_dataset_s3_path,
            "validation": validation_dataset_s3_path,
        },
        logs=True,
        job_name=training_job_name,
    )
else:
    # Launch a SageMaker Training job by passing s3 path of the training data
    tabular_estimator.fit(
        {
            "train": training_dataset_s3_path,
            "validation": validation_dataset_s3_path,
        },
        logs=True,
        job_name=training_job_name,
    )

3.5. Deploy and Run Inference on the Trained Tabular Model


In this section, you learn how to query an existing endpoint and make predictions of the examples you input. For each example, the model will output the probability of the sample for each class in the model. Next, the predicted class label is obtained by taking the class label with the maximum probability over others.

We start by retrieving the artifacts and deploy the tabular_estimator that we trained.


[ ]:
inference_instance_type = "ml.m5.4xlarge"

# Retrieve the inference docker container uri
deploy_image_uri = image_uris.retrieve(
    region=None,
    framework=None,
    image_scope="inference",
    model_id=train_model_id,
    model_version=train_model_version,
    instance_type=inference_instance_type,
)
# Retrieve the inference script uri
deploy_source_uri = script_uris.retrieve(
    model_id=train_model_id, model_version=train_model_version, script_scope="inference"
)

endpoint_name = "jumpstart-distr-lgb-g" + str(int(time.time()))

# Use the estimator from the previous step to deploy to a SageMaker endpoint
predictor = (tuner if use_amt else tabular_estimator).deploy(
    initial_instance_count=1,
    instance_type=inference_instance_type,
    entry_point="inference.py",
    image_uri=deploy_image_uri,
    source_dir=deploy_source_uri,
    endpoint_name=endpoint_name,
)
[ ]:
newline, bold, unbold = "\n", "\033[1m", "\033[0m"

import numpy as np
import pandas as pd
from sklearn.metrics import accuracy_score, f1_score, roc_auc_score
from sklearn.metrics import confusion_matrix
import matplotlib.pyplot as plt

# read the data
test_data_file_name = "test.csv"
test_data = pd.read_csv(test_data_file_name, header=None)
test_data.columns = ["Target"] + [f"Feature_{i}" for i in range(1, test_data.shape[1])]

num_examples, num_columns = test_data.shape
print(
    f"{bold}The test dataset contains {num_examples} examples and {num_columns} columns.{unbold}\n"
)

# prepare the ground truth target and predicting features to send into the endpoint.
ground_truth_label, features = test_data.iloc[:, :1], test_data.iloc[:, 1:]

print(f"{bold}The first 5 observations of the data: {unbold} \n")
test_data.head(5)
[ ]:
content_type = "text/csv"


def query_endpoint(encoded_tabular_data, endpoint_name):
    client = boto3.client("runtime.sagemaker")
    response = client.invoke_endpoint(
        EndpointName=endpoint_name,
        ContentType=content_type,
        Body=encoded_tabular_data,
    )
    return response


def parse_response(query_response):
    model_predictions = json.loads(query_response["Body"].read())
    predicted_probabilities = model_predictions["probabilities"]
    return np.array(predicted_probabilities)


# split the test data into smaller size of batches to query the endpoint if test data has large size.
batch_size = 1500
predict_prob = []
for i in np.arange(0, num_examples, step=batch_size):
    query_response_batch = query_endpoint(
        features.iloc[i : (i + batch_size), :].to_csv(header=False, index=False).encode("utf-8"),
        endpoint_name,
    )
    predict_prob_batch = parse_response(query_response_batch)  # prediction probability per batch
    predict_prob.append(predict_prob_batch)


predict_prob = np.concatenate(predict_prob, axis=0)
predict_label = np.argmax(predict_prob, axis=1)

3.6. Evaluate the Prediction Results Returned from the Endpoint

[ ]:
# Visualize the predictions results by plotting the confusion matrix.
conf_matrix = confusion_matrix(y_true=ground_truth_label.values, y_pred=predict_label)
fig, ax = plt.subplots(figsize=(7.5, 7.5))
ax.matshow(conf_matrix, cmap=plt.cm.Blues, alpha=0.3)
for i in range(conf_matrix.shape[0]):
    for j in range(conf_matrix.shape[1]):
        ax.text(x=j, y=i, s=conf_matrix[i, j], va="center", ha="center", size="xx-large")

plt.xlabel("Predictions", fontsize=18)
plt.ylabel("Actuals", fontsize=18)
plt.title("Confusion Matrix", fontsize=18)
plt.show()
[ ]:
# Measure the prediction results quantitatively.
eval_accuracy = accuracy_score(ground_truth_label.values, predict_label)
eval_f1 = f1_score(ground_truth_label.values, predict_label)
eval_auc = roc_auc_score(ground_truth_label.values, predict_prob[:, 1])

lgb_results = pd.DataFrame.from_dict(
    {
        "Accuracy": eval_accuracy,
        "F1": eval_f1,
        "AUC": eval_auc,
    },
    orient="index",
    columns=["LightGBM with AMT"],
)

lgb_results
[ ]:
# Delete the SageMaker endpoint and the attached resources
predictor.delete_model()
predictor.delete_endpoint()

Notebook CI Test Results

This notebook was tested in multiple regions. The test results are as follows, except for us-west-2 which is shown at the top of the notebook.

This us-east-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This us-east-2 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This us-west-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This ca-central-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This sa-east-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This eu-west-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This eu-west-2 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This eu-west-3 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This eu-central-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This eu-north-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This ap-southeast-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This ap-southeast-2 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This ap-northeast-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This ap-northeast-2 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This ap-south-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable