Fleet Predictive Maintenance: Part 1. Data Preparation with SageMaker Data Wrangler

Using SageMaker Studio to Predict Fault Classification

Background

This notebook is part of a sequence of notebooks whose purpose is to demonstrate a Predictive Maintenance (PrM) solution for automobile fleet maintenance via Amazon SageMaker Studio so that business users have a quick path towards a PrM POC. In this notebook, we will be focusing on preprocessing engine sensor data. It is the first notebook in a series of notebooks. You can choose to run this notebook by itself or in sequence with the other notebooks listed below. Please see the README.md for more information about this use case implement of this sequence of notebooks.

  1. Data Prep: Processing Job from Data Wrangler Output (current notebook)

  2. Data Prep: Featurization

  3. Train, Tune and Predict using Batch Transform

Important Notes:

  • Due to cost consideration, the goal of this example is to show you how to use some of SageMaker Studio’s features, not necessarily to achieve the best result.

  • We use the built-in classification algorithm in this example, and a Python 3 (Data Science) Kernel is required.

  • The nature of predictive maintenace solutions, requires a domain knowledge expert of the system or machinery. With this in mind, we will make assumptions here for certain elements of this solution with the acknowldgement that these assumptions should be informed by a domain expert and a main business stakeholder


SageMaker Data Wrangler Job Notebook

This notebook uses the Data Wrangler .flow file to submit a SageMaker Data Wrangler Job with the following steps:

  • Push Data Wrangler .flow file to S3

  • Parse the .flow file inputs, and create the argument dictionary to submit to a boto client

  • Submit the ProcessingJob arguments and wait for Job completion

Optionally, the notebook also gives an example of starting a SageMaker XGBoost TrainingJob using the newly processed data.

[ ]:
# Upgrade SageMaker to the latest version
! pip install --upgrade sagemaker
[ ]:
import json
import os
import time
import uuid

import boto3
import sagemaker

Parameters

The following lists configurable parameters that are used throughout this notebook.

[ ]:
# S3 bucket for saving processing job outputs
# Feel free to specify a different bucket here if you wish.
sess = sagemaker.Session()
bucket = sess.default_bucket()
prefix = "data_wrangler_flows"
flow_id = f"{time.strftime('%d-%H-%M-%S', time.gmtime())}-{str(uuid.uuid4())[:8]}"
flow_name = f"flow-{flow_id}"
flow_uri = f"s3://{bucket}/{prefix}/{flow_name}.flow"

flow_file_name = "dw_flow/prm.flow"

iam_role = sagemaker.get_execution_role()

# Processing Job Resources Configurations
# Data wrangler processing job only supports 1 instance.
instance_count = 1
instance_type = "ml.m5.4xlarge"

# Processing Job Path URI Information. This is the where the output data from SageMaker Data Wrangler will be stored.
output_prefix = f"export-{flow_name}/output"
output_path = f"s3://{bucket}/{output_prefix}"
# Output name is auto-generated from the select node's ID + output name from the flow file, which specifies how the data will be transformed.
output_name = "ff586e7b-a02d-472b-91d4-da3dd05d7a30.default"

processing_job_name = f"data-wrangler-flow-processing-{flow_id}"

processing_dir = "/opt/ml/processing"

# Modify the variable below to specify the content type to be used for writing each output
# Currently supported options are 'CSV' or 'PARQUET', and default to 'CSV'
output_content_type = "CSV"

# URL to use for sagemaker client.
# If this is None, boto will automatically construct the appropriate URL to use
# when communicating with sagemaker.
sagemaker_endpoint_url = None

For this demo, the following cell has been added to the generated code from the Data Wrangler export. The changes are needed to update the S3 bucket in the .flow file to match your S3 location as well as make sure we have the right container URI depending on your region.

[ ]:
from demo_helpers import update_dw_s3uri

# update the flow file to change the s3 location to our bucket
update_dw_s3uri(flow_file_name)

# get the Data Wrangler container associated with our region
region = boto3.Session().region_name
container_uri = sagemaker.image_uris.retrieve(
    "data-wrangler", sagemaker.Session().boto_region_name, version="1.0.1"
)

dw_output_path_prm = output_path
print(
    f"Storing dw_output_path_prm = {dw_output_path_prm} for use in next notebook 2_fleet_predmaint.ipynb"
)
%store dw_output_path_prm

Push Flow to S3

Use the following cell to upload the Data Wrangler .flow file to Amazon S3 so that it can be used as an input to the processing job.

[ ]:
# Load .flow file
with open(flow_file_name) as f:
    flow = json.load(f)

# Upload to S3
s3_client = boto3.client("s3")
s3_client.upload_file(flow_file_name, bucket, f"{prefix}/{flow_name}.flow")

print(f"Data Wrangler Flow notebook uploaded to {flow_uri}")

Create Processing Job arguments

This notebook submits a Processing Job using the Sagmaker Python SDK. Below, utility methods are defined for creating Processing Job Inputs for the following sources: S3, Athena, and Redshift.

[ ]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.dataset_definition.inputs import (
    AthenaDatasetDefinition,
    DatasetDefinition,
    RedshiftDatasetDefinition,
)


def create_flow_notebook_processing_input(base_dir, flow_s3_uri):
    return ProcessingInput(
        source=flow_s3_uri,
        destination=f"{base_dir}/flow",
        input_name="flow",
        s3_data_type="S3Prefix",
        s3_input_mode="File",
        s3_data_distribution_type="FullyReplicated",
    )


def create_s3_processing_input(s3_dataset_definition, name, base_dir):
    return ProcessingInput(
        source=s3_dataset_definition["s3ExecutionContext"]["s3Uri"],
        destination=f"{base_dir}/{name}",
        input_name=name,
        s3_data_type="S3Prefix",
        s3_input_mode="File",
        s3_data_distribution_type="FullyReplicated",
    )


def create_athena_processing_input(athena_dataset_defintion, name, base_dir):
    return ProcessingInput(
        input_name=name,
        dataset_definition=DatasetDefinition(
            local_path=f"{base_dir}/{name}",
            athena_dataset_definition=AthenaDatasetDefinition(
                catalog=athena_dataset_defintion["catalogName"],
                database=athena_dataset_defintion["databaseName"],
                query_string=athena_dataset_defintion["queryString"],
                output_s3_uri=athena_dataset_defintion["s3OutputLocation"] + f"{name}/",
                output_format=athena_dataset_defintion["outputFormat"].upper(),
            ),
        ),
    )


def create_redshift_processing_input(redshift_dataset_defintion, name, base_dir):
    return ProcessingInput(
        input_name=name,
        dataset_definition=DatasetDefinition(
            local_path=f"{base_dir}/{name}",
            redshift_dataset_definition=RedshiftDatasetDefinition(
                cluster_id=redshift_dataset_defintion["clusterIdentifier"],
                database=redshift_dataset_defintion["database"],
                db_user=redshift_dataset_defintion["dbUser"],
                query_string=redshift_dataset_defintion["queryString"],
                cluster_role_arn=redshift_dataset_defintion["unloadIamRole"],
                output_s3_uri=redshift_dataset_defintion["s3OutputLocation"] + f"{name}/",
                output_format=redshift_dataset_defintion["outputFormat"].upper(),
            ),
        ),
    )


def create_processing_inputs(processing_dir, flow, flow_uri):
    """Helper function for creating processing inputs
    :param flow: loaded data wrangler flow notebook
    :param flow_uri: S3 URI of the data wrangler flow notebook
    """
    processing_inputs = []
    flow_processing_input = create_flow_notebook_processing_input(processing_dir, flow_uri)
    processing_inputs.append(flow_processing_input)

    for node in flow["nodes"]:
        if "dataset_definition" in node["parameters"]:
            data_def = node["parameters"]["dataset_definition"]
            name = data_def["name"]
            source_type = data_def["datasetSourceType"]

            if source_type == "S3":
                processing_inputs.append(create_s3_processing_input(data_def, name, processing_dir))
            elif source_type == "Athena":
                processing_inputs.append(
                    create_athena_processing_input(data_def, name, processing_dir)
                )
            elif source_type == "Redshift":
                processing_inputs.append(
                    create_redshift_processing_input(data_def, name, processing_dir)
                )
            else:
                raise ValueError(f"{source_type} is not supported for Data Wrangler Processing.")

    return processing_inputs


def create_processing_output(output_name, output_path, processing_dir):
    return ProcessingOutput(
        output_name=output_name,
        source=os.path.join(processing_dir, "output"),
        destination=output_path,
        s3_upload_mode="EndOfJob",
    )


def create_container_arguments(output_name, output_content_type):
    output_config = {output_name: {"content_type": output_content_type}}
    return [f"--output-config '{json.dumps(output_config)}'"]

Start ProcessingJob

Now, the Processing Job is submitted using the Processor from the Sagemaker SDK. Logs are turned off, but can be turned on for debugging purposes.

[ ]:
%%time
from sagemaker.processing import Processor

processor = Processor(
    role=iam_role,
    image_uri=container_uri,
    instance_count=instance_count,
    instance_type=instance_type,
    sagemaker_session=sess,
)

processor.run(
    inputs=create_processing_inputs(processing_dir, flow, flow_uri),
    outputs=[create_processing_output(output_name, output_path, processing_dir)],
    arguments=create_container_arguments(output_name, output_content_type),
    wait=True,
    logs=False,
    job_name=processing_job_name,
)

Data Cleaning with Data Wrangler

Load, preparation, EDA and Preprocessing

contents

For the initial data preparation and exploration, we will utilize SageMaker’s new feature, Data Wrangler, to load data and do some data transformations. In the Data Wrangler GUI, we will perform the following steps. Note that because this data is generated, the data is relatively clean and there are few data cleaning steps needed. After completing these steps, you can uncomment and run the code below to inspect your cleaned data. 1. Load fleet sensor logs data from S3 1. Load fleet details data from S3 1. Change column data types 1. Change coulmn headers 1. Check for Null/NA values (impute or drop) 1. Join sensor and details data 1. One-Hot Encode categorical features 1. Do preliminar analysis using built-in feature 1. Export recipe as SageMaker Data Wrangler job 1. Upload final cleaned data set to S3

For our purposes, we will download the final cleaned data set from S3 into our SageMaker Studio instance, but for more information on how to load and preprocess tabular data follow this link: `Tabular Preprocessing Blog <>`__. For additional information on preprocessing for PrM, please refer to this blog, On the relevance of preprocessing in predictive maintenance for dynamic systems.

[ ]:
# fleet = wr.s3.read_csv(path=dw_output_path_prm, dataset=True)
[ ]:
# # add in additional features and change data types
# fleet["datetime"] = pd.to_datetime(fleet["datetime"], format="%Y-%m-%d %H:%M:%S")
# fleet["cycle"] = fleet.groupby("vehicle_id")["datetime"].rank("dense")
# fleet["make"] = fleet["make"].astype("category")
# fleet["model"] = fleet["model"].astype("category")
# fleet["vehicle_class"] = fleet["vehicle_class"].astype("category")
# fleet["engine_type"] = fleet["engine_type"].astype("category")
# fleet["engine_age"] = fleet["datetime"].dt.year - fleet["year"]
[ ]:
# fleet = fleet[
#     [
#         "target",
#         "vehicle_id",
#         "datetime",
#         "make",
#         "model",
#         "year",
#         "vehicle_class",
#         "engine_type",
#         "make_code_Make A",
#         "make_code_Make B",
#         "make_code_Make E",
#         "make_code_Make C",
#         "make_code_Make D",
#         "model_code_Model E1",
#         "model_code_Model A4",
#         "model_code_Model B1",
#         "model_code_Model B2",
#         "model_code_Model A2",
#         "model_code_Model A3",
#         "model_code_Model B3",
#         "model_code_Model C2",
#         "model_code_Model A1",
#         "model_code_Model A5",
#         "model_code_Model A6",
#         "model_code_Model C1",
#         "model_code_Model D1",
#         "model_code_Model E2",
#         "vehicle_class_code_Truck-Tractor",
#         "vehicle_class_code_Truck",
#         "vehicle_class_code_Bus",
#         "vehicle_class_code_Transport",
#         "engine_type_code_Engine E",
#         "engine_type_code_Engine C",
#         "engine_type_code_Engine B",
#         "engine_type_code_Engine F",
#         "engine_type_code_Engine H",
#         "engine_type_code_Engine D",
#         "engine_type_code_Engine A",
#         "engine_type_code_Engine G",
#         "voltage",
#         "current",
#         "resistance",
#         "cycle",
#         "engine_age",
#     ]
# ]
[ ]:
# fleet.sort_values(by=["vehicle_id", "datetime"], inplace=True)
# fleet.to_csv("fleet_data.csv", index=False)
# fleet.shape

If you followed the above steps correctly, you data should match that of the existing fleet_data.csv. It would also fit the following key observations:

  • There are 90 vehicles in the fleet

  • Data has 9000 observations and 44 columns.

  • Vehicle can be identified useing the ‘vehicle_id’ column.

  • The label column, called ‘Target’, is an indicator of failure (‘0’ = No Failure; ‘1’ = Failure).

  • There are 4 numeric features available for prediction and 4 categorical features. We will expand upon these later in the Feature Engineering section of this notebook.

Kick off SageMaker Training Job (Optional)

Data Wrangler is a SageMaker tool for processing data to be used for Machine Learning. Now that the data has been processed, users will want to train a model using the data. The following shows an example of doing so using a popular algorithm XGBoost.

It is important to note that the following XGBoost objective [‘binary’, ‘regression’, ‘multiclass’], hyperparameters, or content_type may not be suitable for the output data, and will require changes to train a proper model. Furthermore, for CSV training, the algorithm assumes that the target variable is in the first column. For more information on SageMaker XGBoost, please see XGBoost Algorithm.

The below demonstrates how to recursively search the output directory to find the data location.

[ ]:
s3_client = boto3.client("s3")
list_response = s3_client.list_objects_v2(Bucket=bucket, Prefix=output_prefix)

training_path = None

for content in list_response["Contents"]:
    if "_SUCCESS" not in content["Key"]:
        training_path = content["Key"]

print(training_path)

Next, the Training Job hyperparameters are set. For more information on XGBoost Hyperparameters, see XGBoost Parameters.

[ ]:
region = boto3.Session().region_name
container = sagemaker.image_uris.retrieve("xgboost", region, "1.2-1")
hyperparameters = {
    "max_depth": "5",
    "objective": "reg:squarederror",
    "num_round": "10",
}
train_content_type = (
    "application/x-parquet" if output_content_type.upper() == "PARQUET" else "text/csv"
)
train_input = sagemaker.inputs.TrainingInput(
    s3_data=f"s3://{bucket}/{training_path}",
    content_type=train_content_type,
)

The TrainingJob configurations are set using the SageMaker Python SDK Estimator, and which is fit using the training data from the ProcessingJob that was run earlier.

[ ]:
estimator = sagemaker.estimator.Estimator(
    container,
    iam_role,
    hyperparameters=hyperparameters,
    instance_count=1,
    instance_type="ml.m5.2xlarge",
)
estimator.fit({"train": train_input})