Fleet Predictive Maintenance: Part 3. Training, Hyperparameter Tuning, and Prediction

Using SageMaker Studio to Predict Fault Classification

Background

This notebook is part of a sequence of notebooks whose purpose is to demonstrate a Predictive Maintenance (PrM) solution for automobile fleet maintenance via Amazon SageMaker Studio so that business users have a quick path towards a PrM POC. In this notebook, we will be focusing on training, tuning, and deploying a model. It is the third notebook in a series of notebooks. You can choose to run this notebook by itself or in sequence with the other notebooks listed below. Please see the README.md for more information about this use case implement of this sequence of notebooks.

  1. Data Prep: Processing Job from SageMaker Data Wrangler Output

  2. Data Prep: Featurization

  3. Train, Tune and Predict using Batch Transform (current notebook)

Important Notes:

  • Due to cost consideration, the goal of this example is to show you how to use some of SageMaker Studio’s features, not necessarily to achieve the best result.

  • We use the built-in classification algorithm in this example, and a Python 3 (Data Science) Kernel is required.

  • The nature of predictive maintenace solutions, requires a domain knowledge expert of the system or machinery. With this in mind, we will make assumptions here for certain elements of this solution with the acknowldgement that these assumptions should be informed by a domain expert and a main business stakeholder


Setup

Let’s start by:

  • Installing and importing any dependencies

  • Instantiating SageMaker session

  • Specifying the S3 bucket and prefix that you want to use for your training and model data. This should be within the same region as SageMaker training

  • Defining the IAM role used to give training access to your data

[ ]:
# Install any missing dependencies
!pip install -qU 'sagemaker-experiments==0.1.24' 'sagemaker>=2.16.1' 'boto3' 'awswrangler'
[ ]:
import os
import json
import sys
import collections
import glob
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

# SageMaker dependencies
import boto3
import sagemaker
from sagemaker import get_execution_role
from sagemaker.image_uris import retrieve
import awswrangler as wr

# This instantiates a SageMaker session that we will be operating in.
smclient = boto3.Session().client("sagemaker")
region = boto3.Session().region_name

# This object represents the IAM role that we are assigned.
role = sagemaker.get_execution_role()

sess = sagemaker.Session()
bucket = sess.default_bucket()

# prefix is the path within the bucket where SageMaker stores the output from training jobs.
prefix_prm = "predmaint"  # place to upload training files within the bucket

Before training, we must first upload our data in S3. To see how the existing train, test, and validation datasets were generated, take a look at Data Prep: Processing Job from SageMaker Data Wrangler Output (which is the first part of this notebook series) followed by Data Prep: Featurization (which is the second part of this notebook series). See the Background section at the beginning of the notebook for more information.

[ ]:
# helper function for converting data to csv(necessary for Linear Learner) and upload to S3
def upload_file_to_bucket(bucket, prefix, file_path):
    file_dir, file_name = os.path.split(file_path)
    df = pd.read_csv(file_path)
    boto3.resource("s3").meta.client.upload_file(
        Filename=file_path, Bucket=bucket, Key=(prefix + "/" + file_name)
    )
    print(f"uploaded {prefix} data location: s3://{bucket}/{prefix}/{file_name}")
    path_to_data = f"s3://{bucket}/{prefix}/{file_name}"
    return path_to_data
[ ]:
# convert and upload to S3
path_to_train_data_prm = upload_file_to_bucket(bucket, "train", "train.csv")
path_to_test_data_prm = upload_file_to_bucket(bucket, "test", "test.csv")
path_to_test_x_data_prm = upload_file_to_bucket(bucket, "test", "test_x.csv")
path_to_valid_data_prm = upload_file_to_bucket(bucket, "validation", "validation.csv")

# let's also setup an output S3 location for the model artifact that will be output as the result of training with the algorithm.
output_location = f"s3://{bucket}/output"
print("training artifacts will be uploaded to: {}".format(output_location))
[ ]:
from sagemaker.inputs import TrainingInput

train_channel = TrainingInput(path_to_train_data_prm, content_type="text/csv")
test_channel = TrainingInput(path_to_test_data_prm, content_type="text/csv")
test_x_channel = TrainingInput(path_to_test_x_data_prm, content_type="text/csv")
valid_channel = TrainingInput(path_to_valid_data_prm, content_type="text/csv")

data_channels = {"train": train_channel, "validation": valid_channel}

The data is stored in S3 and is ready for use in the estimators.


Train

SageMaker Estimator and Experiments

Once you have selected some models that you would like to try out, SageMaker Experiments can be a great tool to track and compare all of the models before selecting the best model to deploy. We will set up an experiment using SageMaker experiments to track all the model training iterations for the Linear Learner Estimator we will try. You can read more about SageMaker Experiments to learn about experiment features, tracking and comparing outputs.

[ ]:
# setup
# import dependencies
from sagemaker.analytics import ExperimentAnalytics
from smexperiments.experiment import Experiment
from smexperiments.trial import Trial
from smexperiments.trial_component import TrialComponent
from smexperiments.tracker import Tracker
from time import strftime
[ ]:
if "create_date" not in locals():
    create_date = strftime("%Y-%m-%d-%H-%M-%S")
    %store create_date

    # location within S3 for outputs
    exp_prefix = f"sagemaker-experiments/linear-learner-{create_date}"
    %store exp_prefix

If you used the storemagic previously, you can pick up from here using the create_date variable.

[ ]:
# create the experiment
experiment_name = f"ll-failure-classification-{create_date}"

try:
    my_experiment = Experiment.load(experiment_name=experiment_name)
    print(f"Experiment loaded {experiment_name}: SUCCESS")
except Exception as e:
    if "ResourceNotFound" in str(e):
        my_experiment = Experiment.create(
            experiment_name=experiment_name,
            description="Classification PrM Experiment",
            tags=[{"Key": "my-experiments", "Value": "exp"}],
            sagemaker_boto_client=smclient,
        )
        print(f"Experiment creation {experiment_name}: SUCCESS")

Note: The tags parameter is optional. You can search for the tag using Studio, the SageMaker console, and the SDK. Tags can also be applied to trials and trial components. For information on how to search tags using Studio, see Search by Tag.

[ ]:
with Tracker.create(display_name="training", sagemaker_boto_client=smclient) as tracker:
    tracker.log_input(name="prm-dataset", media_type="s3/uri", value=path_to_train_data_prm)

Now we can begin to specify our linear model from the Amazon SageMaker Linear Learner Estimator. For this binary classification problem, we have the option of selecting between logistic regression or hinge loss (Support Vector Machines). Here are additional resources to learn more about the Input/Output Interface for the Linear Learner Algorithm and the Linear Learner Hyperparameters. One piece to note is that Amazon SageMaker’s Linear Learner actually fits many models in parallel, each with slightly different hyperparameters, and then returns the one with the best fit. This functionality is automatically enabled. There are a number of additional parameters available for the Linear Learner Estimator, so we will start be using the default features as well as:

  • loss which controls how we penalize mistakes in our model estimates. For this case, we will start with logistic and move to using hinge loss if necessary for model improvement.

  • predictor_type is set to ‘binary_classifier’ since we are trying to predict whether a failure occurs or it doesn’t.

  • mini_batch_size is set to 99. This value can be tuned for relatively minor improvements in fit and speed, but selecting a reasonable value relative to the dataset is appropriate in most cases.

  • wd or l1 which control regularization. Regularization can prevent model overfitting by preventing our estimates from becoming too finely tuned to the training data, which can actually hurt generalizability. In this case, we’ll leave these parameters as their default “auto” though.

We will start by first building a logistic regression Linear Learner Estimator, setting the hyperparameters and configuring the SageMaker Experiment with the trial created above. We will then evaluate the results of the experiment.

[ ]:
# set output path
lr_output_path = f"s3://{bucket}/{exp_prefix}/output/lr_default"

# with SageMaker v2.0, Image URI function get_image_uri has been replaced with sagemaker.image_uris.retrieve()
container = sagemaker.image_uris.retrieve(
    framework="linear-learner", region=region, version="1", image_scope="training"
)

# create the trail component and the first experiment for linear learner
training_trail_component = tracker.trial_component
trial_name_1 = trial_name = f"linear-learner-lr-training-job-{create_date}"

# create the trial if it doesn't exist
try:
    my_trial = Trial.load(trial_name=trial_name_1)
    print(f"Loaded existing trial: {trial_name_1}")
except Exception as e:
    if "ResourceNotFound" in str(e):
        my_trial = Trial.create(experiment_name=experiment_name, trial_name=trial_name_1)
        print(f"Create trial {my_trial.trial_name}: SUCCESSFUL \n")

        print(f"Creating logistic regression estimator. \n")
        lr = sagemaker.estimator.Estimator(
            container,
            role,
            instance_count=1,
            instance_type="ml.c4.xlarge",
            output_path=lr_output_path,
            sagemaker_session=sess,
            enable_sagemaker_metrics=True,
        )

        lr.set_hyperparameters(
            predictor_type="binary_classifier",
            loss="logistic",  # default for auto is logistic regression
            epochs=20,  # high number of epochs as early stopping feature will stop training
            mini_batch_size=99,
        )

        lr.fit(
            inputs=data_channels,
            experiment_config={
                "ExperimentName": my_experiment.experiment_name,
                "TrialName": my_trial.trial_name,
                "TrialComponentDisplayName": "ll-lr-training-job",
            },
            logs=True,
        )

We will now train a Linear Learner model with hinge loss, i.e. Support Vector Machines, and use the default hyperparmeters listed below. We will add this trial to the experiment for later comparison.

[ ]:
svm_output_path = f"s3://{bucket}/{exp_prefix}/output/svm_default"
trial_name_2 = f"linear-learner-svm-{create_date}"

# create the trial if it doesn't exist
try:
    my_trial = Trial.load(trial_name=trial_name_2)
    print(f"Loaded existing trial: {trial_name_2}")
except Exception as e:
    if "ResourceNotFound" in str(e):
        my_trial = Trial.create(experiment_name=experiment_name, trial_name=trial_name_2)
        print(f"Create trial {my_trial.trial_name}: SUCCESSFUL")

        svm = sagemaker.estimator.Estimator(
            container,
            role,
            instance_count=1,
            instance_type="ml.c4.xlarge",
            output_path=svm_output_path,
            sagemaker_session=sess,
            enable_sagemaker_metrics=True,
        )
        svm.set_hyperparameters(
            predictor_type="binary_classifier", loss="hinge_loss", epochs=20, mini_batch_size=99
        )

        svm.fit(
            inputs=data_channels,
            experiment_config={
                "ExperimentName": my_experiment.experiment_name,
                "TrialName": my_trial.trial_name,
                "TrialComponentDisplayName": "ll-svm-training-job",
            },
            logs=True,
        )

Next, we will add in a selection for the hyperparameter binary_classifier_model_selection_criteria which allows us to define which metric we would like our model to optimize for (auto threshold tuning). We can choose between percision, recall, F1 and accuracy among other metric choices.

It’s important to note here that for PrM, we must carefully select our evaluation metric based on domain knowlege. For example, recall increases the chance of catching all failures even false ones. Whereas, precision decreases the chance of catching false failures along with real failures. It is not hard to see that these metrics do not always incorporate the business needs, especially the dollar costs and benefits associated with machinery failures.

With this in mind, we will select F1 as the evaluation metric as it is generally a good evaluation metric for imbalanced classification proglems. If you would like to learn more about selecting a custom cost sensitive evaluation metric for your business use case, you can review the article listen in the Additional Resources section below.

[ ]:
svm_output_path = f"s3://{bucket}/{exp_prefix}/output/svm_threshold"
trial_name_3 = f"linear-learner-svm-thresh-{create_date}"

# create the trial if it doesn't exist
try:
    my_trial = Trial.load(trial_name=trial_name_3)
    print(f"Loaded existing trial: {trial_name_3}")
except Exception as e:
    if "ResourceNotFound" in str(e):
        my_trial = Trial.create(experiment_name=experiment_name, trial_name=trial_name_3)
        print(f"Create trial {my_trial.trial_name}: SUCCESSFUL")

        svm_thresh = sagemaker.estimator.Estimator(
            container,
            role,
            instance_count=1,
            instance_type="ml.c4.xlarge",
            output_path=svm_output_path,
            sagemaker_session=sess,
        )

        svm_thresh.set_hyperparameters(
            predictor_type="binary_classifier",
            loss="hinge_loss",
            binary_classifier_model_selection_criteria="f_beta",
            epochs=20,
            mini_batch_size=99,
        )

        # linear.fit({'train': path_to_train_data_prm})
        svm_thresh.fit(
            inputs=data_channels,
            experiment_config={
                "ExperimentName": my_experiment.experiment_name,
                "TrialName": my_trial.trial_name,
                "TrialComponentDisplayName": "ll-svm-thresh-training-job",
            },
            logs=True,
        )

Let’s try dealing with class imbalances to try to improve precision and recall

We will set the hyperparameter positive_example_weight_mult to balanced in order to use weighting by class to address the class imbalance issue. Since we have only 19% failures compared to non-failures, we can leverage this built-in hyperparameter to try to improve model performance. Here is more documentation about Linear Learner Hyperparameters.

[ ]:
# Training a binary classifier with hinge loss and balanced class weights

# set output path
svm_output_path = f"s3://{bucket}/{exp_prefix}/output/svm_balanced"
trial_name_4 = f"linear-learner-svm-balanced-{create_date}"

# create the trial if it doesn't exist
try:
    my_trial = Trial.load(trial_name=trial_name_4)
    print(f"Loaded existing trial: {trial_name_4}")
except Exception as e:
    if "ResourceNotFound" in str(e):
        my_trial = Trial.create(experiment_name=experiment_name, trial_name=trial_name_4)
        print(f"Create trial {my_trial.trial_name}: SUCCESSFUL")

        # specify algorithm containers and instantiate an Estimator with hyperparams
        svm_balanced = sagemaker.estimator.Estimator(
            container,
            role,
            instance_count=1,
            instance_type="ml.c4.xlarge",
            output_path=svm_output_path,
            sagemaker_session=sess,
            enable_sagemaker_metrics=True,
        )

        svm_balanced.set_hyperparameters(
            predictor_type="binary_classifier",
            loss="hinge_loss",
            positive_example_weight_mult="balanced",  # this is for dealing with class imbalances
            epochs=20,
            mini_batch_size=99,
        )
        # fit model to data
        svm_balanced.fit(
            inputs=data_channels,
            experiment_config={
                "ExperimentName": my_experiment.experiment_name,
                "TrialName": my_trial.trial_name,
                "TrialComponentDisplayName": "ll-svm-bal-training-job",
            },
            logs=True,
        )
[ ]:
# first we can look at all the trials together to evaluate the performance
trial_component_analytics = ExperimentAnalytics(experiment_name=my_experiment.experiment_name)
analytic_table = trial_component_analytics.dataframe()
analytic_table = analytic_table[
    [
        "TrialComponentName",
        "DisplayName",
        "positive_example_weight_mult",
        "validation:recall - Avg",
        "validation:binary_classification_accuracy - Avg",
        "validation:roc_auc_score - Avg",
        "train:objective_loss - Avg",
        "validation:objective_loss:final - Avg",
        "validation:objective_loss - Avg",
        "validation:binary_f_beta - Avg",
        "validation:precision - Avg",
        "Trials",
        "Experiments",
    ]
]

analytic_table.sort_values(
    ["validation:binary_classification_accuracy - Avg", "validation:binary_f_beta - Avg"],
    ascending=False,
)

Observations:

  • Balancing class weights improved precision, but decreased recall significantly

  • Accuracy for all the models is relatively consistent at around 84%

  • Across all the metrics, SVM with auto hyperparameters performed the best cumulatively

We will move forward with with the auto SMV model, labeled as svm-default

[ ]:
# # option to deploy a predictor here and make predictions against this endpoint

# # initialize the deserializer and serializer
# serializer = sagemaker.serializers.CSVSerializer()
# deserializer = sagemaker.deserializers.JSONDeserializer()

# svm_predictor = svm.deploy(initial_instance_count=1,
#                            instance_type='local'
#                            instance_type='ml.m4.xlarge',
#                            serializer=serializer,
#                            deserializer=deserializer,
#                            endpoint_name='svm',
#                            model_name='svm')

Hyperparamter Tuning

Next, we set up the hyperparmeter tuning job using SageMaker Automatic Tuning

Note, with the settings below, the hyperparameter tuning job can take about 30 minutes to complete.

Hyperparameters can dramtically affect the performance of trained models. Thus, we need to pick the right values to achieve the best model result. Since model results are also affected by the data set as well, it is important to select the best hyperparmateters by searching for them using an algorithmic approach that can be automated and perform efficiently.

Using Automatic Tuning, we will specify a range, or a list of possible values in the case of categorical hyperparameters, for each of the hyperparameter that we plan to tune. SageMaker hyperparameter tuning will automatically launch multiple training jobs with different hyperparameter settings, evaluate results of those training jobs based on a predefined “objective metric”, and select the hyperparameter settings for future attempts based on previous results. For each hyperparameter tuning job, we will give it a budget (max number of training jobs) and it will complete once that many training jobs have been executed.

In this example, we are using SageMaker Python SDK to set up and manage the hyperparameter tuning job. We first configure the training jobs the hyperparameter tuning job will launch by initiating an estimator, which includes the following configuration:

  • hyperparameters that SageMaker Automatic Model Tuning will tune: learning_rate

  • the maximum number of training jobs it will run to optimize the objective metric: 5

  • the number of parallel training jobs that will run in the tuning job: 2

  • the objective metric that Automatic Model Tuning will use: validation:accuracy

We will also demonstrates how to associate trial components created by a hyperparameter tuning job with an experiment management trial.

Read the following link more information on how to Tune a Linear Learner Model and about How Hyperparameter Tuning Works

[ ]:
from sagemaker.tuner import HyperparameterTuner, ContinuousParameter
from botocore.exceptions import ClientError
[ ]:
# custom job name
prm_tuning_job_name = f"ll-svm-tuning-job"
[ ]:
# set output path
svm_output_path = f"s3://{bucket}/{exp_prefix}/output/tuning/svm_default"

# create the tuning job if it doesn't exist
try:
    svm_tune = sagemaker.estimator.Estimator(
        container,
        role,
        instance_count=1,
        instance_type="ml.c4.xlarge",
        output_path=svm_output_path,
        sagemaker_session=sess,
    )

    svm_tune.set_hyperparameters(
        predictor_type="binary_classifier", loss="hinge_loss", epochs=20, mini_batch_size=99
    )

    hyperparameter_ranges = {
        "learning_rate": ContinuousParameter(0.01, 0.5, scaling_type="Logarithmic")
    }

    # configure HyperparameterTuner
    my_tuner = HyperparameterTuner(
        estimator=svm_tune,  # previously-configured Estimator object
        objective_metric_name="validation:binary_classification_accuracy",
        hyperparameter_ranges=hyperparameter_ranges,
        max_jobs=5,
        max_parallel_jobs=2,
        strategy="Random",
        base_tuning_job_name=prm_tuning_job_name,
    )

    # start hyperparameter tuning job
    my_tuner.fit(inputs=data_channels, include_cls_metadata=False)
    print(f"Create tuning job {prm_tuning_job_name}: SUCCESSFUL")
except ClientError as e:
    if "ResourceInUse" in str(e):
        my_tuner = HyperparameterTuner.attach(prm_tuning_job_name)
        print(f"Attach tuning job {prm_tuning_job_name}: SUCCESSFUL")
[ ]:
# check status
boto3.client("sagemaker").describe_hyper_parameter_tuning_job(
    HyperParameterTuningJobName=my_tuner.latest_tuning_job.job_name
)["HyperParameterTuningJobStatus"]
[ ]:
tune_analytics = sagemaker.HyperparameterTuningJobAnalytics(
    my_tuner.latest_tuning_job.job_name
).dataframe()
[ ]:
# get the most recently created tuning jobs
list_tuning_jobs_response = smclient.list_hyper_parameter_tuning_jobs(
    SortBy="CreationTime", SortOrder="Descending"
)

# inspect output
print(f'Found {len(list_tuning_jobs_response["HyperParameterTuningJobSummaries"])} tuning jobs.')
tuning_jobs = list_tuning_jobs_response["HyperParameterTuningJobSummaries"]
most_recently_created_tuning_job = tuning_jobs[0]
tuning_job_name = most_recently_created_tuning_job["HyperParameterTuningJobName"]
experiment_name = my_experiment.experiment_name
tune_trial_name = tuning_job_name + "-trial"
%store tune_trial_name

print(f"Associate all training jobs created by {tuning_job_name} with trial {tune_trial_name}")
[ ]:
# create the trial if it doesn't exist
try:
    tune_trial = Trial.load(trial_name=tune_trial_name)
    print(f"Loaded existing trial: {tune_trial_name}")
except Exception as e:
    if "ResourceNotFound" in str(e):
        tune_trial = Trial.create(experiment_name=experiment_name, trial_name=tune_trial_name)
        print(f"Create trial {tune_trial.trial_name}: SUCCESSFUL")
[ ]:
from sagemaker import HyperparameterTuningJobAnalytics, Session
from smexperiments.search_expression import Filter, Operator, SearchExpression
from smexperiments.trial import Trial
from smexperiments.trial_component import TrialComponent

# get the training jobs associated with the tuning job
tuning_analytics = HyperparameterTuningJobAnalytics(tuning_job_name, sess)

training_job_summaries = tuning_analytics.training_job_summaries()
training_job_arns = list(map(lambda x: x["TrainingJobArn"], training_job_summaries))
print(
    f"Found {len(training_job_arns)} training jobs for hyperparameter tuning job {tuning_job_name}."
)
[ ]:
import time
from datetime import datetime, timezone

# get the trial components derived from the training jobs
creation_time = most_recently_created_tuning_job["CreationTime"]
creation_time = creation_time.astimezone(timezone.utc)
creation_time = creation_time.strftime("%Y-%m-%dT%H:%M:%SZ")

created_after_filter = Filter(
    name="CreationTime",
    operator=Operator.GREATER_THAN_OR_EQUAL,
    value=str(creation_time),
)

# the training job names contain the tuning job name (and the training job name is in the source arn)
source_arn_filter = Filter(
    name="Source.SourceArn", operator=Operator.CONTAINS, value=tuning_job_name
)
source_type_filter = Filter(
    name="Source.SourceType", operator=Operator.EQUALS, value="SageMakerTrainingJob"
)

search_expression = SearchExpression(
    filters=[created_after_filter, source_arn_filter, source_type_filter]
)

# search iterates over every page of results by default
trial_component_search_results = list(
    TrialComponent.search(search_expression=search_expression, sagemaker_boto_client=smclient)
)
print(f"Found {len(trial_component_search_results)} trial components.")
[ ]:
# associate the trial components with the trial
for tc in trial_component_search_results:
    print(
        f"Associating trial component {tc.trial_component_name} with trial {tune_trial.trial_name}."
    )
    tune_trial.add_trial_component(tc.trial_component_name)
    # sleep to avoid throttling
    time.sleep(0.5)
[ ]:
# here is the output of all of the hyperparameter tuning trial runs
tuning_analytics.dataframe()

Predict

Deploy Model with Batch Transform and Get Inferences for the Test Dataset

Let’s predict on our test dataset to understand how accurate our model is. We will create batch inferences and use a helper function to evaludate the results.

Use batch transform when you: - Want to get inferences for an entire dataset and index them to serve inferences in real time - Don’t need a persistent endpoint that applications (for example, web or mobile apps) can call to get inferences - Don’t need the subsecond latency that SageMaker hosted endpoints provide

Here is additional information about how to Use Batch Transform.

The following code creates a sagemaker.transformer.Transformer object from the model that was trained in the sections above. Then it calls that object’s transform method to create a transform job. When you create the sagemaker.transformer.Transformer object, you specify the number and type of ML instances to use to perform the batch transform job, and the location in Amazon S3 where you want to store the inferences.For more information, see SageMaker Transformer.

Select the best model, deploy and get batch predictions

[ ]:
# get best model
best_model = my_tuner.best_estimator()

# the location of the test dataset
batch_input = path_to_test_x_data_prm

# the location to store the results of the batch transform job
batch_output = f"s3://{bucket}/transform/batch-inference"

# create transformer for best model
transformer = best_model.transformer(
    instance_count=1,
    instance_type="ml.m4.xlarge",
    output_path=batch_output,
    accept="application/json",
)

# get batch inferences
transformer.transform(data=batch_input, content_type="text/csv", split_type="Line")

transformer.wait()

Great, now that our Batch Transform job is complete, we can download the predictions and evaluate the results.

[ ]:
!aws s3 cp --recursive $transformer.output_path ./

We see that the endpoint returned JSON which contains predictions, including the score and predicted_label. In this case, score will be a continuous value between [0, 1] representing the probability there will be a failure in the window. predicted_label will take a value of either 0 or 1 where 1 denotes that we predict a failure within the next designated time window, while 0 denotes that we are predicting that there is not a failure within the next time window.

[ ]:
def evaluate_model(batch_file_path, test_labels, model_name, metrics=True):
    """
    Download batch predictions and iterate through results. Evaluate model results by comparing to actuals from test data.
    Return binary classification metrics.
    """

    # open batch prediction file and parse json to exctract predicted label
    with open(batch_file_path, "r") as f:
        for line in f:
            result = json.loads(line)
            predictions = pd.Series(
                [
                    result["predictions"][i]["predicted_label"]
                    for i in range(len(result["predictions"]))
                ]
            )

    # calculate true positives, false positives, true negatives, false negatives
    tp = np.logical_and(test_labels, predictions).sum()
    fp = np.logical_and(1 - test_labels, predictions).sum()
    tn = np.logical_and(1 - test_labels, 1 - predictions).sum()
    fn = np.logical_and(test_labels, 1 - predictions).sum()

    # calculate binary classification metrics
    recall = tp / (tp + fn)
    precision = tp / (tp + fp)
    accuracy = (tp + tn) / (tp + fp + tn + fn)
    f1 = 2 * precision * recall / (precision + recall)

    if metrics:
        print(pd.crosstab(test_labels, predictions, rownames=["actuals"], colnames=["predictions"]))
        print(f"Accuracy: {accuracy}")
        print(f"F1: {f1}")
        print(f"Recall: {recall}")
        print(f"Precision: {precision}")

    return {
        "TP": tp,
        "FP": fp,
        "FN": fn,
        "TN": tn,
        "Precision": precision,
        "Recall": recall,
        "Accuracy": accuracy,
        "F1": f1,
        "Model": model_name,
    }
[ ]:
# call evaluation function and inspect results
test = pd.read_csv("test.csv", header=None)
test_y = test[0]
evaluate_model("test_x.csv.out", test_y, "PrM-Classification-SVM", metrics=True)

Clean Up (Optional)

Once we’re done don’t forget to clean up the endpoint and experiments to prevent unnecessary billing.

[ ]:
def delete_endpoint(predictor):
    try:
        predictor.delete_model()
        predictor.delete_endpoint()
        print("Deleted {}".format(predictor.endpoint))
    except:
        print("Already deleted: {}".format(predictor.endpoint))
[ ]:
# you can run this cell if you deployed your estimator instead of doing Batch Transform
# delete_endpoint(svm_predictor)

(Optional) In order to delete the experiments created, you can delete_all or use the cleanup_boto3 helper function to delete individual experiments by passing the experiment name to the function.

[ ]:
# run this to delete the experiment and all its related trials and trial components
# my_experiment.delete_all(action='--force')
sm = boto3.Session().client("sagemaker")


def cleanup_boto3(experiment_name):
    trials = sm.list_trials(ExperimentName=experiment_name)["TrialSummaries"]
    print("TrialNames:")
    for trial in trials:
        trial_name = trial["TrialName"]
        print(f"\n{trial_name}")

        components_in_trial = sm.list_trial_components(TrialName=trial_name)
        print("\tTrialComponentNames:")
        for component in components_in_trial["TrialComponentSummaries"]:
            component_name = component["TrialComponentName"]
            print(f"\t{component_name}")
            sm.disassociate_trial_component(TrialComponentName=component_name, TrialName=trial_name)
            try:
                # comment out to keep trial components
                sm.delete_trial_component(TrialComponentName=component_name)
            except:
                # component is associated with another trial
                continue
            # to prevent throttling
            time.sleep(0.5)
        sm.delete_trial(TrialName=trial_name)
    sm.delete_experiment(ExperimentName=experiment_name)
    print(f"\nExperiment {experiment_name} deleted")


# Call cleanup_boto3

# Use experiment name not display name
experiment_name = my_experiment.experiment_name
# cleanup_boto3(experiment_name)

Extensions

  • Our linear model does a relatively good job of predicting if a failure will occur in the next window of time and has an overall accuracy of close to 84%, but we can re-run the model with different values of the hyperparameters, loss functions etc and see if we get improved prediction

    • Re-running the model with further tweaks to these hyperparameters may provide more accurate out-of-sample predictions

  • We also did not do much feature engineering

    • We can create additional features by considering cross-product/intreaction of multiple features, squaring or raising higher powers of the features to induce non-linear effects, etc.

    • If we expand the features using non-linear terms and interactions, we can then tweak the regulaization parameter to optimize the expanded model and hence generate improved forecasts

  • As a further extension, we can experiment with many tree-based models such as XGBoost, Random Forest, or gradient boosting to address the class imbalance as these models are less sensitive to class imbalances

  • Once the model performance has met the business’s desired output, it can be deployed at the edge using AWS IoT Greengrass

Additonal Resources