Amazon SageMaker Batch Transform: Associate prediction results with their corresponding input records


This notebook’s CI test result for us-west-2 is as follows. CI test results in other regions can be found at the end of the notebook.

This us-west-2 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable


Use SageMaker’s XGBoost to train a binary classification model and for a list of tumors in batch file, predict if each is malignant

It also shows how to use the input output joining / filter feature in Batch transform in details


Background

This purpose of this notebook is to train a model using SageMaker’s XGBoost and UCI’s breast cancer diagnostic data set to illustrate at how to run batch inferences and how to use the Batch Transform I/O join feature. UCI’s breast cancer diagnostic data set is available at https://archive.ics.uci.edu/ml/datasets/Breast+Cancer+Wisconsin+%28Diagnostic%29. The data set is also available on Kaggle at https://www.kaggle.com/uciml/breast-cancer-wisconsin-data. The purpose here is to use this data set to build a predictve model of whether a breast mass image indicates benign or malignant tumor.


Setup

Let’s start by specifying:

  • The SageMaker role arn used to give training and batch transform access to your data. The snippet below will use the same role used by your SageMaker notebook instance. Otherwise, specify the full ARN of a role with the SageMakerFullAccess policy attached.

  • The S3 bucket that you want to use for training and storing model objects.

[1]:
import os
import boto3
import sagemaker

role = sagemaker.get_execution_role()

bucket = sagemaker.Session().default_bucket()
prefix = "DEMO-breast-cancer-prediction-xgboost-lowlevel"

Data sources

Dua, D. and Graff, C. (2019). UCI Machine Learning Repository [http://archive.ics.uci.edu/ml]. Irvine, CA: University of California, School of Information and Computer Science.

Breast Cancer Wisconsin (Diagnostic) Data Set [https://archive.ics.uci.edu/ml/datasets/Breast+Cancer+Wisconsin+(Diagnostic)].

Also see: Breast Cancer Wisconsin (Diagnostic) Data Set [https://www.kaggle.com/uciml/breast-cancer-wisconsin-data].

Data preparation

Let’s download the data and save it in the local folder with the name data.csv and take a look at it.

[2]:
import pandas as pd
import numpy as np

s3 = boto3.client("s3")

filename = "wdbc.csv"
s3.download_file(
    f"sagemaker-example-files-prod-{boto3.session.Session().region_name}",
    "datasets/tabular/breast_cancer/wdbc.csv",
    filename,
)
data = pd.read_csv(filename, header=None)

# specify columns extracted from wbdc.names
data.columns = [
    "id",
    "diagnosis",
    "radius_mean",
    "texture_mean",
    "perimeter_mean",
    "area_mean",
    "smoothness_mean",
    "compactness_mean",
    "concavity_mean",
    "concave points_mean",
    "symmetry_mean",
    "fractal_dimension_mean",
    "radius_se",
    "texture_se",
    "perimeter_se",
    "area_se",
    "smoothness_se",
    "compactness_se",
    "concavity_se",
    "concave points_se",
    "symmetry_se",
    "fractal_dimension_se",
    "radius_worst",
    "texture_worst",
    "perimeter_worst",
    "area_worst",
    "smoothness_worst",
    "compactness_worst",
    "concavity_worst",
    "concave points_worst",
    "symmetry_worst",
    "fractal_dimension_worst",
]

# save the data
data.to_csv("data.csv", sep=",", index=False)

data.sample(8)
[2]:
id diagnosis radius_mean texture_mean perimeter_mean area_mean smoothness_mean compactness_mean concavity_mean concave points_mean ... radius_worst texture_worst perimeter_worst area_worst smoothness_worst compactness_worst concavity_worst concave points_worst symmetry_worst fractal_dimension_worst
91 861799 M 15.370 22.76 100.20 728.2 0.09200 0.10360 0.11220 0.07483 ... 16.43 25.84 107.50 830.9 0.1257 0.1997 0.2846 0.14760 0.2556 0.06828
395 903811 B 14.060 17.18 89.75 609.1 0.08045 0.05361 0.02681 0.03251 ... 14.92 25.34 96.42 684.5 0.1066 0.1231 0.0846 0.07911 0.2523 0.06609
224 8813129 B 13.270 17.02 84.55 546.4 0.08445 0.04994 0.03554 0.02456 ... 15.14 23.60 98.84 708.8 0.1276 0.1311 0.1786 0.09678 0.2506 0.07623
406 905189 B 16.140 14.86 104.30 800.0 0.09495 0.08501 0.05500 0.04528 ... 17.71 19.58 115.90 947.9 0.1206 0.1722 0.2310 0.11290 0.2778 0.07012
388 903011 B 11.270 15.50 73.38 392.0 0.08365 0.11140 0.10070 0.02757 ... 12.04 18.93 79.73 450.0 0.1102 0.2809 0.3021 0.08272 0.2157 0.10430
137 868682 B 11.430 15.39 73.06 399.8 0.09639 0.06889 0.03503 0.02875 ... 12.32 22.02 79.93 462.0 0.1190 0.1648 0.1399 0.08476 0.2676 0.06765
68 859471 B 9.029 17.33 58.79 250.5 0.10660 0.14130 0.31300 0.04375 ... 10.31 22.65 65.50 324.7 0.1482 0.4365 1.2520 0.17500 0.4228 0.11750
329 895633 M 16.260 21.88 107.50 826.8 0.11650 0.12830 0.17990 0.07981 ... 17.73 25.21 113.70 975.2 0.1426 0.2116 0.3344 0.10470 0.2736 0.07953

8 rows × 32 columns

Key observations:

  • The data has 569 observations and 32 columns.

  • The first field is the ‘id’ attribute that we will want to drop before batch inference and add to the final inference output next to the probability of malignancy.

  • Second field, ‘diagnosis’, is an indicator of the actual diagnosis (‘M’ = Malignant; ‘B’ = Benign).

  • There are 30 other numeric features that we will use for training and inferencing.

Let’s replace the M/B diagnosis with a 1/0 boolean value.

[3]:
data["diagnosis"] = data["diagnosis"].apply(lambda x: ((x == "M")) + 0)
data.sample(8)
[3]:
id diagnosis radius_mean texture_mean perimeter_mean area_mean smoothness_mean compactness_mean concavity_mean concave points_mean ... radius_worst texture_worst perimeter_worst area_worst smoothness_worst compactness_worst concavity_worst concave points_worst symmetry_worst fractal_dimension_worst
5 843786 1 12.45 15.70 82.57 477.1 0.12780 0.17000 0.15780 0.08089 ... 15.47 23.75 103.40 741.6 0.1791 0.52490 0.53550 0.17410 0.3985 0.12440
311 89382601 0 14.61 15.69 92.68 664.9 0.07618 0.03515 0.01447 0.01877 ... 16.46 21.75 103.70 840.8 0.1011 0.07087 0.04746 0.05813 0.2530 0.05695
177 87281702 1 16.46 20.11 109.30 832.9 0.09831 0.15560 0.17930 0.08866 ... 17.79 28.45 123.50 981.2 0.1415 0.46670 0.58620 0.20350 0.3054 0.09519
129 866674 1 19.79 25.12 130.40 1192.0 0.10150 0.15890 0.25450 0.11490 ... 22.63 33.58 148.70 1589.0 0.1275 0.38610 0.56730 0.17320 0.3305 0.08465
515 916221 0 11.34 18.61 72.76 391.2 0.10490 0.08499 0.04302 0.02594 ... 12.47 23.03 79.15 478.6 0.1483 0.15740 0.16240 0.08542 0.3060 0.06783
325 89511502 0 12.67 17.30 81.25 489.9 0.10280 0.07664 0.03193 0.02107 ... 13.71 21.10 88.70 574.4 0.1384 0.12120 0.10200 0.05602 0.2688 0.06888
279 8911834 0 13.85 15.18 88.99 587.4 0.09516 0.07688 0.04479 0.03711 ... 14.98 21.74 98.37 670.0 0.1185 0.17240 0.14560 0.09993 0.2955 0.06912
323 895100 1 20.34 21.51 135.90 1264.0 0.11700 0.18750 0.25650 0.15040 ... 25.30 31.86 171.10 1938.0 0.1592 0.44920 0.53440 0.26850 0.5558 0.10240

8 rows × 32 columns

Let’s split the data as follows: 80% for training, 10% for validation and let’s set 10% aside for our batch inference job. In addition, let’s drop the ‘id’ field on the training set and validation set as ‘id’ is not a training feature. For our batch set however, we keep the ‘id’ feature. We’ll want to filter it out prior to running our inferences so that the input data features match the ones of training set and then ultimately, we’ll want to join it with inference result. We are however dropping the diagnosis attribute for the batch set since this is what we’ll try to predict.

[4]:
# data split in three sets, training, validation and batch inference
rand_split = np.random.rand(len(data))
train_list = rand_split < 0.8
val_list = (rand_split >= 0.8) & (rand_split < 0.9)
batch_list = rand_split >= 0.9

data_train = data[train_list].drop(["id"], axis=1)
data_val = data[val_list].drop(["id"], axis=1)
data_batch = data[batch_list].drop(["diagnosis"], axis=1)
data_batch_noID = data_batch.drop(["id"], axis=1)

Let’s upload those data sets in S3

[5]:
s3_resource = boto3.Session().resource("s3")

train_file = "train_data.csv"
data_train.to_csv(train_file, index=False, header=False)
with open(train_file, "rb") as data:
    s3_resource.Bucket(bucket).upload_fileobj(data, os.path.join(prefix, "train", train_file))

validation_file = "validation_data.csv"
data_val.to_csv(validation_file, index=False, header=False)
with open(validation_file, "rb") as data:
    s3_resource.Bucket(bucket).upload_fileobj(
        data, os.path.join(prefix, "validation", validation_file)
    )

batch_file = "batch_data.csv"
data_batch.to_csv(batch_file, index=False, header=False)
with open(batch_file, "rb") as data:
    s3_resource.Bucket(bucket).upload_fileobj(data, os.path.join(prefix, "batch", batch_file))

batch_file_noID = "batch_data_noID.csv"
data_batch_noID.to_csv(batch_file_noID, index=False, header=False)
with open(batch_file_noID, "rb") as data:
    s3_resource.Bucket(bucket).upload_fileobj(data, os.path.join(prefix, "batch", batch_file_noID))

Training job and model creation

The below cell uses the Boto3 SDK to kick off the training job using both our training set and validation set. Not that the objective is set to ‘binary:logistic’ which trains a model to output a probability between 0 and 1 (here the probability of a tumor being malignant).

[ ]:
%%time
from time import gmtime, strftime
from sagemaker.image_uris import retrieve

job_name = "xgb-" + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
print("Training job", job_name)

image = retrieve(framework="xgboost", region=boto3.Session().region_name, version="latest")

output_location = "s3://{}/{}/output/{}".format(bucket, prefix, job_name)
print("Training artifacts will be uploaded to: {}".format(output_location))

create_training_params = {
    "AlgorithmSpecification": {"TrainingImage": image, "TrainingInputMode": "File"},
    "RoleArn": role,
    "OutputDataConfig": {"S3OutputPath": output_location},
    "ResourceConfig": {"InstanceCount": 1, "InstanceType": "ml.m5.4xlarge", "VolumeSizeInGB": 50},
    "TrainingJobName": job_name,
    "HyperParameters": {
        "objective": "binary:logistic",
        "max_depth": "5",
        "eta": "0.2",
        "gamma": "4",
        "min_child_weight": "6",
        "subsample": "0.8",
        "silent": "0",
        "num_round": "100",
    },
    "StoppingCondition": {"MaxRuntimeInSeconds": 60 * 60},
    "InputDataConfig": [
        {
            "ChannelName": "train",
            "DataSource": {
                "S3DataSource": {
                    "S3DataType": "S3Prefix",
                    "S3Uri": "s3://{}/{}/train".format(bucket, prefix),
                    "S3DataDistributionType": "FullyReplicated",
                }
            },
            "CompressionType": "None",
            "RecordWrapperType": "None",
            "ContentType": "text/csv",
        },
        {
            "ChannelName": "validation",
            "DataSource": {
                "S3DataSource": {
                    "S3DataType": "S3Prefix",
                    "S3Uri": "s3://{}/{}/validation".format(bucket, prefix),
                    "S3DataDistributionType": "FullyReplicated",
                }
            },
            "CompressionType": "None",
            "RecordWrapperType": "None",
            "ContentType": "text/csv",
        },
    ],
}

sagemaker = boto3.client("sagemaker")
sagemaker.create_training_job(**create_training_params)
status = sagemaker.describe_training_job(TrainingJobName=job_name)["TrainingJobStatus"]
print(status)

try:
    sagemaker.get_waiter("training_job_completed_or_stopped").wait(TrainingJobName=job_name)
finally:
    status = sagemaker.describe_training_job(TrainingJobName=job_name)["TrainingJobStatus"]
    print("Training job ended with status: " + status)
    if status == "Failed":
        message = sagemaker.describe_training_job(TrainingJobName=job_name)["FailureReason"]
        print("Training failed with the following error: {}".format(message))
        raise Exception("Training job failed")

Let’s create a model based on our training job.

The below cell creates a model in SageMaker based on the training job we just executed. The model can later be deployed using the SageMaker hosting services or in our case used in a Batch Transform job.

[ ]:
%%time

model_name = job_name
print(model_name)

info = sagemaker.describe_training_job(TrainingJobName=job_name)
model_data = info["ModelArtifacts"]["S3ModelArtifacts"]

primary_container = {"Image": image, "ModelDataUrl": model_data}

create_model_response = sagemaker.create_model(
    ModelName=model_name, ExecutionRoleArn=role, PrimaryContainer=primary_container
)

print(create_model_response["ModelArn"])

Batch Transform

In SageMaker Batch Transform, we introduced a new attribute called DataProcessing.In the below cell, we use the Boto3 SDK to kick-off several Batch Transform jobs using different configurations of DataProcessing. Please refer to Associate Prediction Results with Input Records to learn more about how to utilize the DataProcessing attribute.

1. Without data processing

Let’s first set the data processing fields to null and inspect the inference results. We’ll use it as a baseline to compare to the results with data processing.

[ ]:
%%time

from time import gmtime, strftime

batch_job_name = "Batch-Transform-" + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
input_location = "s3://{}/{}/batch/{}".format(
    bucket, prefix, batch_file_noID
)  # use input data without ID column
output_location = "s3://{}/{}/output/{}".format(bucket, prefix, batch_job_name)

request = {
    "TransformJobName": batch_job_name,
    "ModelName": job_name,
    "TransformOutput": {
        "S3OutputPath": output_location,
        "Accept": "text/csv",
        "AssembleWith": "Line",
    },
    "TransformInput": {
        "DataSource": {"S3DataSource": {"S3DataType": "S3Prefix", "S3Uri": input_location}},
        "ContentType": "text/csv",
        "SplitType": "Line",
        "CompressionType": "None",
    },
    "TransformResources": {"InstanceType": "ml.m4.xlarge", "InstanceCount": 1},
}

sagemaker.create_transform_job(**request)
print("Created Transform job with name: ", batch_job_name)

# Wait until the job finishes
try:
    sagemaker.get_waiter("transform_job_completed_or_stopped").wait(TransformJobName=batch_job_name)
finally:
    response = sagemaker.describe_transform_job(TransformJobName=batch_job_name)
    status = response["TransformJobStatus"]
    print("Transform job ended with status: " + status)
    if status == "Failed":
        message = response["FailureReason"]
        print("Transform failed with the following error: {}".format(message))
        raise Exception("Transform job failed")

Let’s inspect the output of the Batch Transform job in S3. It should show the list probabilities of tumors being malignant.

[ ]:
import re


def get_csv_output_from_s3(s3uri, batch_file):
    file_name = "{}.out".format(batch_file)
    match = re.match("s3://([^/]+)/(.*)", "{}/{}".format(s3uri, file_name))
    output_bucket, output_prefix = match.group(1), match.group(2)
    s3.download_file(output_bucket, output_prefix, file_name)
    return pd.read_csv(file_name, sep=",", header=None)
[ ]:
output_df = get_csv_output_from_s3(output_location, batch_file_noID)
output_df.head(8)

2. Join the input and the prediction results

Now, let’s use the new feature to associate the prediction results with their corresponding input records. We can also use the InputFilter to exclude the ID column easily and there’s no need to have a separate file in S3.

  • Set InputFilter to “$[1:]”: indicates that we are excluding column 0 (the ‘ID’) before processing the inferences and keeping everything from column 1 to the last column (all the features or predictors)

  • Set JoinSource to “Input”: indicates our desire to join the input data with the inference results

  • Leave OutputFilter to default (“$”), indicating that the joined input and inference results be will saved as output.

[ ]:
%%time

batch_job_name = "Batch-Transform-" + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
input_location = "s3://{}/{}/batch/{}".format(
    bucket, prefix, batch_file
)  # use input data with ID column cause InputFilter will filter it out
output_location = "s3://{}/{}/output/{}".format(bucket, prefix, batch_job_name)

request["TransformJobName"] = batch_job_name
request["TransformInput"]["DataSource"]["S3DataSource"]["S3Uri"] = input_location
request["TransformOutput"]["S3OutputPath"] = output_location

request["DataProcessing"] = {
    "InputFilter": "$[1:]",  # exclude the ID column (index 0)
    "JoinSource": "Input",  # join the input with the inference results
}

sagemaker.create_transform_job(**request)
print("Created Transform job with name: ", batch_job_name)

# Wait until the job finishes
try:
    sagemaker.get_waiter("transform_job_completed_or_stopped").wait(TransformJobName=batch_job_name)
finally:
    response = sagemaker.describe_transform_job(TransformJobName=batch_job_name)
    status = response["TransformJobStatus"]
    print("Transform job ended with status: " + status)
    if status == "Failed":
        message = response["FailureReason"]
        print("Transform failed with the following error: {}".format(message))
        raise Exception("Transform job failed")

Let’s inspect the output of the Batch Transform job in S3. It should show the list of tumors identified by their original feature columns and their corresponding probabilities of being malignant.

[ ]:
output_df = get_csv_output_from_s3(output_location, batch_file)
output_df.head(8)

3. Update the output filter to keep only ID and prediction results

Let’s try to change the output filter to “$[0,-1]”, indicating that when presenting the output, we only want to keep column 0 (the ‘ID’) and the last column (the inference result i.e. the probability of a given tumor to be malignant)

[ ]:
%%time

batch_job_name = "Batch-Transform-" + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
output_location = "s3://{}/{}/output/{}".format(bucket, prefix, batch_job_name)

request["TransformJobName"] = batch_job_name
request["TransformOutput"]["S3OutputPath"] = output_location
request["DataProcessing"][
    "OutputFilter"
] = "$[0, -1]"  # keep the first and last column of the joined output

sagemaker.create_transform_job(**request)
print("Created Transform job with name: ", batch_job_name)

# Wait until the job finishes
try:
    sagemaker.get_waiter("transform_job_completed_or_stopped").wait(TransformJobName=batch_job_name)
finally:
    response = sagemaker.describe_transform_job(TransformJobName=batch_job_name)
    status = response["TransformJobStatus"]
    print("Transform job ended with status: " + status)
    if status == "Failed":
        message = response["FailureReason"]
        print("Transform failed with the following error: {}".format(message))
        raise Exception("Transform job failed")

Now, let’s inspect the output of the Batch Transform job in S3 again. It should show 2 columns: the ID and their corresponding probabilities of being malignant.

[ ]:
output_df = get_csv_output_from_s3(output_location, batch_file)
output_df.head(8)

In summary, we can use DataProcessing to 1. Filter / select useful features from the input dataset. e.g. exclude ID columns. 2. Associate the prediction results with their corresponding input records. 3. Filter the original or joined results before saving to S3. e.g. keep ID and probability columns only.

Notebook CI Test Results

This notebook was tested in multiple regions. The test results are as follows, except for us-west-2 which is shown at the top of the notebook.

This us-east-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This us-east-2 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This us-west-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This ca-central-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This sa-east-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This eu-west-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This eu-west-2 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This eu-west-3 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This eu-central-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This eu-north-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This ap-southeast-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This ap-southeast-2 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This ap-northeast-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This ap-northeast-2 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This ap-south-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable