SageMaker Batch Transform custom TensorFlow inference.py (CSV & TFRecord)


This notebook’s CI test result for us-west-2 is as follows. CI test results in other regions can be found at the end of the notebook.

This us-west-2 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable


Introduction

This notebook trains a simple classifier on the Iris dataset. Training is completed locally on the machine where this notebook is executed. A custom inference.py script for CSV and TFRecord is used for hosting our model in a Batch Transform Job. The TFRecord data is generated from the CSV data.

Table of Contents: * Prerequisites * Training the Network Locally * Set the model up for hosting * Write a custom inference.py script * Create Batch Transform Job

Prerequisites

Packages and Permissions

Here, we set up the specific TensorFlow version that is used to train locally. We specify the same version when we host our model. The SageMaker SDK uses the SageMaker default S3 bucket when needed. If the get_execution_role does not return a role with the appropriate permissions, you’ll need to specify an IAM role ARN that does. Please make sure the SageMakerFullAccess policy is attached to the execution role you are using.

[ ]:
!pip install --upgrade sagemaker
[ ]:
import boto3
import numpy as np
import os
import pandas as pd
import re
import sagemaker
from sagemaker.tensorflow import TensorFlowModel
import shutil
import tarfile
import tensorflow as tf
from tensorflow.python.keras.utils.np_utils import to_categorical
from tensorflow.keras.layers import Input, Dense
from datetime import datetime

role = sagemaker.get_execution_role()
sm_session = sagemaker.Session()
bucket_name = sm_session.default_bucket()

Model Definitions

For this example, we use a very simple network architecture with three densely-connected layers.

[ ]:
def iris_mlp(metrics):
    ### Setup loss and output node activation
    output_activation = "softmax"
    loss = "sparse_categorical_crossentropy"

    input = Input(shape=(4,), name="input")

    x = Dense(
        units=10,
        kernel_regularizer=tf.keras.regularizers.l2(0.001),
        activation="relu",
        name="dense_layer1",
    )(input)

    x = Dense(
        units=20,
        kernel_regularizer=tf.keras.regularizers.l2(0.001),
        activation="relu",
        name="dense_layer2",
    )(x)

    x = Dense(
        units=10,
        activation="relu",
        kernel_regularizer=tf.keras.regularizers.l2(0.001),
        name="dense_layer3",
    )(x)

    output = Dense(units=3, activation=output_activation)(x)

    ### Compile the model
    model = tf.keras.Model(input, output)

    model.compile(optimizer="adam", loss=loss, metrics=metrics)

    return model

Data Setup

We’ll use the pre-processed Iris training and test data stored in the sagemaker-example-files-prod-{region} public S3 bucket.

[ ]:
# Download Iris test and train data sets from S3
SOURCE_DATA_BUCKET = f"sagemaker-example-files-prod-{sm_session.boto_region_name}"
SOURCE_DATA_PREFIX = "datasets/tabular/iris"
sm_session.download_data(".", bucket=SOURCE_DATA_BUCKET, key_prefix=SOURCE_DATA_PREFIX)

# Load the training and test data from .csv to a Pandas data frame.
train_df = pd.read_csv(
    "iris_train.csv",
    header=0,
    names=["sepal_length", "sepal_width", "petal_length", "petal_width", "class"],
)
test_df = pd.read_csv(
    "iris_test.csv",
    header=0,
    names=["sepal_length", "sepal_width", "petal_length", "petal_width", "class"],
)

# Pop the record labels into N x 1 Numpy arrays
train_labels = np.array(train_df.pop("class"))
test_labels = np.array(test_df.pop("class"))

# Save the remaining features as Numpy arrays
train_np = np.array(train_df)
test_np = np.array(test_df)

Training the Network Locally

Here, we train the network using the TensorFlow .fit method. This should only take a few seconds because the model is very simple.

[ ]:
EPOCHS = 50
BATCH_SIZE = 32

EARLY_STOPPING = tf.keras.callbacks.EarlyStopping(
    monitor="val_loss", mode="auto", restore_best_weights=True
)

# Instantiate classifier
classifier = iris_mlp(metrics=["accuracy", "binary_accuracy"])

# Fit classifier
history = classifier.fit(
    x=train_np,
    y=train_labels,
    validation_data=(test_np, test_labels),
    callbacks=[EARLY_STOPPING],
    batch_size=BATCH_SIZE,
    epochs=EPOCHS,
)

Set the model up for hosting

Export the model from TensorFlow

The SageMaker TensorFlow Serving container expects the model artifacts are organized in the following format:

1
├── keras_metadata.pb
├── saved_model.pb
└── variables
    ├── variables.data-00000-of-00001
    └── variables.index
[ ]:
classifier.save("1")
with tarfile.open("model.tar.gz", "w:gz") as tar:
    tar.add("1")

Using the SageMaker session to upload the model on to the default SageMaker S3 bucket. We use the sagemaker.Session.upload_data method to do this.

[ ]:
s3_response = sm_session.upload_data("model.tar.gz", bucket=bucket_name, key_prefix="model")
s3_response

View model input tensor shape

We use the saved_model_cli to view the model’s input tensors which help us in building our custom inference.py script.

As we can see our model expects input in the shape of (-1, 4).

[ ]:
!saved_model_cli show --all --dir {"1"}

CSV Example Batch Data

Below we view the sample CSV data that will be used as input to our Transform Job

[ ]:
!head Data/batch-iris-data.csv
[ ]:
s3_csv_data = "s3://{}/datasets/batch-iris-data.csv".format(bucket_name)
s3_csv_data

Upload CSV input data to S3

[ ]:
!aws s3 cp Data/batch-iris-data.csv $s3_csv_data

TFRecord Example Batch Data

We use the CSV data to generate TFRecord data that will also be used for inference. The CSV and TFRecord use identical data, they just express it in different formats.

[ ]:
csv = pd.read_csv("Data/batch-iris-data.csv", header=None).values

with tf.io.TFRecordWriter("Data/batch-iris-data.tfrecords") as writer:
    for row in csv:
        features = row[:]
        example = tf.train.Example()
        example.features.feature["features"].float_list.value.extend(features)

        writer.write(example.SerializeToString())
[ ]:
raw_dataset = tf.data.TFRecordDataset("Data/batch-iris-data.tfrecords")

for raw_record in raw_dataset.take(1):
    example = tf.train.Example()
    example.ParseFromString(raw_record.numpy())
    print(example)
[ ]:
s3_tf_record_data = "s3://{}/datasets/batch-iris-data.tfrecords".format(bucket_name)
s3_tf_record_data

Upload TFRecord input data to S3

[ ]:
!aws s3 cp Data/batch-iris-data.tfrecords $s3_tf_record_data

Write a custom inference.py script

Our model accepts a tensor of (-1, 4). Hence, we create an input handler for each file type (text/csv, application/x-tfrecord).

[ ]:
%%writefile inference.py
import json
import os

os.system("pip install numpy tensorflow crcmod")
import numpy as np
import tensorflow as tf
from google.protobuf.json_format import MessageToDict


import crcmod


def _masked_crc32c(value):
    crc = crcmod.predefined.mkPredefinedCrcFun("crc-32c")(value)
    return (((crc >> 15) | (crc << 17)) + 0xA282EAD8) & 0xFFFFFFFF


def read_tfrecords(tfrecords):
    import io
    import struct

    tfrecords_bytes = io.BytesIO(tfrecords)

    examples = []

    while True:
        length_header = 12
        buf = tfrecords_bytes.read(length_header)
        if not buf:
            # reached end of tfrecord buffer, return examples
            return examples

        if len(buf) != length_header:
            raise ValueError("TFrecord is fewer than %d bytes" % length_header)
        length, length_mask = struct.unpack("<QI", buf)
        length_mask_actual = _masked_crc32c(buf[:8])
        if length_mask_actual != length_mask:
            raise ValueError("TFRecord does not contain a valid length mask")

        length_data = length + 4
        buf = tfrecords_bytes.read(length_data)
        if len(buf) != length_data:
            raise ValueError("TFRecord data payload has fewer bytes than specified in header")
        data, data_mask_expected = struct.unpack("<%dsI" % length, buf)
        data_mask_actual = _masked_crc32c(data)
        if data_mask_actual != data_mask_expected:
            raise ValueError("TFRecord has an invalid data crc32c")

        # Deserialize the tf.Example proto
        example = tf.train.Example()
        example.ParseFromString(data)
        example_features = MessageToDict(example)["features"]["feature"]["features"]["floatList"][
            "value"
        ]
        # Extract a feature map from the example object
        examples.append(example_features)

    return examples


def read_csv(csv):
    return np.array([[float(j) for j in i.split(",")] for i in csv.splitlines()])


def input_handler(data, context):
    """Pre-process request input before it is sent to TensorFlow Serving REST API
    Args:
        data (obj): the request data stream
        context (Context): an object containing request and configuration details
    Returns:
        (dict): a JSON-serializable dict that contains request body and headers
    """

    if context.request_content_type == "text/csv":
        payload = data.read().decode("utf-8")
        inputs = read_csv(payload)

        input = {"inputs": inputs.tolist()}

        return json.dumps(input)

    if context.request_content_type == "application/x-tfrecord":
        payload = data.read()
        examples = read_tfrecords(payload)

        input = {"inputs": examples}

        return json.dumps(input)

    raise ValueError(
        '{{"error": "unsupported content type {}"}}'.format(
            context.request_content_type or "unknown"
        )
    )


def output_handler(data, context):
    """Post-process TensorFlow Serving output before it is returned to the client.
    Args:
        data (obj): the TensorFlow serving response
        context (Context): an object containing request and configuration details
    Returns:
        (bytes, string): data to return to client, response content type
    """

    if data.status_code != 200:
        raise ValueError(data.content.decode("utf-8"))

    response_content_type = context.accept_header
    prediction = data.content
    return prediction, response_content_type

Confirm the input_handler for TFRecord and CSV return the same output

Due to the fact that the TFRecord and CSV input data are the same, the input_handler should have identical output for each format.

[ ]:
import inference


class Context:
    def __init__(self, request_content_type):
        self.request_content_type = request_content_type


tfrecord_bytes = open("Data/batch-iris-data.tfrecords", "rb")
tfrecord_input = inference.input_handler(tfrecord_bytes, Context("application/x-tfrecord"))

csv_file_bytes = open("Data/batch-iris-data.csv", "rb")
csv_input = inference.input_handler(csv_file_bytes, Context("text/csv"))

assert csv_input == tfrecord_input, "CSV and TFRecord output do not match!"
print("CSV and TFRecord output match")

Create Batch Transform Job

Create the SageMaker TensorFlow Model

First we create a TensorFlowModel which specifies the custom inference.py, TensorFlow version and points to our model tar ball in S3.

[ ]:
region = "us-east-1"
from sagemaker.tensorflow.model import TensorFlowModel

tensorflow_serving_model_batch = TensorFlowModel(
    model_data=f"s3://{bucket_name}/model/model.tar.gz",
    entry_point="inference.py",
    role=role,
    framework_version="2.3.1",
    sagemaker_session=sm_session,
)

CSV Input Transform Job

Create the Transform Job by specifying the S3 CSV input data location and content_type as text/csv.

[ ]:
date = datetime.now().strftime("%Y-%m-%d-%H-%m-%S")
date
[ ]:
output_data_path_batch = "s3://{}/output/batch_iris/".format(bucket_name)
output_data_path = output_data_path_batch
batch_instance_count = 1
batch_instance_type = "ml.m5.4xlarge"
concurrency = 5
max_payload_in_mb = 1
split_type = "Line"
batch_strategy = "MultiRecord"
CSV_job_name = "tensorflow-inference-CSV-{}".format(date)


transformer = tensorflow_serving_model_batch.transformer(
    instance_count=batch_instance_count,
    instance_type=batch_instance_type,
    max_concurrent_transforms=concurrency,
    max_payload=max_payload_in_mb,
    strategy=batch_strategy,
    output_path=output_data_path,
)

transformer.transform(
    data=s3_csv_data,
    content_type="text/csv",
    split_type=split_type,
    wait=False,
    job_name=CSV_job_name,
)
print(CSV_job_name)

TFRecord Input Transform Job

Create the Transform Job by specifying the S3 TFRecord input data location and content_type as application/x-tfrecord.

[ ]:
output_data_path_batch = "s3://{}/output/batch_iris/".format(bucket_name)

output_data_path = output_data_path_batch
batch_instance_count = 1
batch_instance_type = "ml.m5.4xlarge"
concurrency = 5
max_payload_in_mb = 1
split_type = "TFRecord"
batch_strategy = "MultiRecord"
TFRecord_job_name = "tensorflow-inference-TFRecord-{}".format(date)

transformer = tensorflow_serving_model_batch.transformer(
    instance_count=batch_instance_count,
    instance_type=batch_instance_type,
    max_concurrent_transforms=concurrency,
    max_payload=max_payload_in_mb,
    strategy=batch_strategy,
    output_path=output_data_path,
)

transformer.transform(
    data=s3_tf_record_data,
    content_type="application/x-tfrecord",
    split_type=split_type,
    wait=False,
    job_name=TFRecord_job_name,
)
print(TFRecord_job_name)

Monitor Transform Jobs Status

[ ]:
import time

client = boto3.client("sagemaker")

while True:
    TFRecord_response = client.describe_transform_job(TransformJobName=TFRecord_job_name)
    CSV_response = client.describe_transform_job(TransformJobName=CSV_job_name)

    if (CSV_response["TransformJobStatus"] == "InProgress") & (
        TFRecord_response["TransformJobStatus"] == "InProgress"
    ):
        print("CSV and TFRecord Transform Job is inProgress...")
        time.sleep(5)
        continue
    elif (CSV_response["TransformJobStatus"] == "Completed") & (
        TFRecord_response["TransformJobStatus"] == "Completed"
    ):
        print("CSV and TFRecord Transform Job is Completed")
        break
    else:
        print("CSV Transform Job status: {}".format(CSV_response["TransformJobStatus"]))
        print("TFRecord Transform Job status: {}".format(TFRecord_response["TransformJobStatus"]))
        break

Notebook CI Test Results

This notebook was tested in multiple regions. The test results are as follows, except for us-west-2 which is shown at the top of the notebook.

This us-east-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This us-east-2 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This us-west-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This ca-central-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This sa-east-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This eu-west-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This eu-west-2 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This eu-west-3 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This eu-central-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This eu-north-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This ap-southeast-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This ap-southeast-2 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This ap-northeast-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This ap-northeast-2 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This ap-south-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable