SageMaker Inference Recommender


This notebook’s CI test result for us-west-2 is as follows. CI test results in other regions can be found at the end of the notebook.

This us-west-2 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable


1. Introduction

SageMaker Inference Recommender is a new capability of SageMaker that reduces the time required to get machine learning (ML) models in production by automating performance benchmarking and load testing models across SageMaker ML instances. You can use Inference Recommender to deploy your model to a real-time inference endpoint that delivers the best performance at the lowest cost.

Get started with Inference Recommender on SageMaker in minutes while selecting an instance and get an optimized endpoint configuration in hours, eliminating weeks of manual testing and tuning time.

2. Setup

Note that we are using the conda_tensorflow2_p36 kernel in SageMaker Notebook Instances. This is running Python 3.6 and TensorFlow 2.1.3. If you’d like to use the same setup, in the AWS Management Console, go to the Amazon SageMaker console. Choose Notebook Instances, and click create a new notebook instance. Upload the current notebook and set the kernel. You can also run this in SageMaker Studio Notebooks with the TensorFlow 2.6 Python 3.8 CPU Optimized kernel.

In the next steps, you’ll import standard methods and libraries as well as set variables that will be used in this notebook. The get_execution_role function retrieves the AWS Identity and Access Management (IAM) role you created at the time of creating your notebook instance.

[ ]:
!pip install --upgrade pip awscli botocore boto3  --quiet
[ ]:
from sagemaker import get_execution_role, Session, image_uris
import boto3
import time
[ ]:
region = boto3.Session().region_name
role = get_execution_role()
sm_client = boto3.client("sagemaker", region_name=region)
sagemaker_session = Session()

3. Machine learning model details

Inference Recommender uses metadata about your ML model to recommend the best instance types and endpoint configurations for deployment. You can provide as much or as little information as you’d like but the more information you provide, the better your recommendations will be.

ML Frameworks: TENSORFLOW, PYTORCH, XGBOOST, SAGEMAKER-SCIKIT-LEARN

ML Domains: COMPUTER_VISION, NATURAL_LANGUAGE_PROCESSING, MACHINE_LEARNING

Example ML Tasks: CLASSIFICATION, REGRESSION, IMAGE_CLASSIFICATION, OBJECT_DETECTION, SEGMENTATION, FILL_MASK, TEXT_CLASSIFICATION, TEXT_GENERATION, OTHER

Note: Select the task that is the closest match to your model. Chose OTHER if none apply.

[ ]:
import tensorflow as tf

# ML framework details
framework = "tensorflow"
# Note that only the framework major and minor version is supported for Neo compilation
framework_version = ".".join(tf.__version__.split(".")[:-1])

# model name as standardized by model zoos or a similar open source model
model_name = "resnet50"

# ML model details
ml_domain = "COMPUTER_VISION"
ml_task = "IMAGE_CLASSIFICATION"

print("TF Version", framework_version)

4. Create a model archive

SageMaker models need to be packaged in .tar.gz files. When your SageMaker Endpoint is provisioned, the files in the archive will be extracted and put in /opt/ml/model/ on the Endpoint.

In this step, there are two optional tasks to:

  1. Download a pretrained model from Keras applications

  2. Download a sample inference script (inference.py) from S3

These tasks are provided as a sample reference but can and should be modified when using your own trained models with Inference Recommender.

Optional: Download model from Keras applications

Let’s download the model from Keras applications. By setting the variable download_the_model=False, you can skip the download and provide your own model archive.

[ ]:
download_the_model = True
[ ]:
import os
import tensorflow as tf
from tensorflow.keras.applications.resnet50 import ResNet50
from tensorflow.keras import backend
[ ]:
if download_the_model:
    tf.keras.backend.set_learning_phase(0)
    input_tensor = tf.keras.Input(name="input_1", shape=(224, 224, 3))
    model = tf.keras.applications.resnet50.ResNet50(input_tensor=input_tensor)

    # Creating the directory strcture
    model_version = "1"
    export_dir = "./model/" + model_version
    if not os.path.exists(export_dir):
        os.makedirs(export_dir)
        print("Directory ", export_dir, " Created ")
    else:
        print("Directory ", export_dir, " already exists")

    # Save to SavedModel
    model.save(export_dir, save_format="tf", include_optimizer=False)
[ ]:
os.makedirs("code")
[ ]:
%%writefile code/inference.py

import io
import json
import numpy as np
from PIL import Image

IMAGE_SIZE = (224, 224)


def input_handler(data, context):
    """Pre-process request input before it is sent to TensorFlow Serving REST API
    https://github.com/aws/amazon-sagemaker-examples/blob/0e57a288f54910a50dcbe3dfe2acb8d62e3b3409/sagemaker-python-sdk/tensorflow_serving_container/sample_utils.py#L61

    Args:
        data (obj): the request data stream
        context (Context): an object containing request and configuration details

    Returns:
        (dict): a JSON-serializable dict that contains request body and headers
    """

    if context.request_content_type == "application/x-image":
        buf = np.fromstring(data.read(), np.uint8)
        image = Image.open(io.BytesIO(buf)).resize(IMAGE_SIZE)
        image = np.array(image)
        image = np.expand_dims(image, axis=0)
        return json.dumps({"instances": image.tolist()})
    else:
        _return_error(
            415, 'Unsupported content type "{}"'.format(context.request_content_type or "Unknown")
        )


def output_handler(response, context):
    """Post-process TensorFlow Serving output before it is returned to the client.

    Args:
        response (obj): the TensorFlow serving response
        context (Context): an object containing request and configuration details

    Returns:
        (bytes, string): data to return to client, response content type
    """
    if response.status_code != 200:
        _return_error(response.status_code, response.content.decode("utf-8"))
    response_content_type = context.accept_header
    prediction = response.content
    return prediction, response_content_type


def _return_error(code, message):
    raise ValueError("Error: {}, {}".format(str(code), message))
[ ]:
%%writefile code/requirements.txt

numpy
pillow

Create a tarball

To bring your own TensorFlow model, SageMaker expects a single archive file in .tar.gz format, containing a model file (.pb) in TF SavedModel format and the script (.py) for inference.

[ ]:
model_archive_name = "tfmodel.tar.gz"
[ ]:
!tar -cvpzf {model_archive_name} ./model ./code

Upload to S3

We now have a model archive ready. We need to upload it to S3 before we can use with Inference Recommender. Furthermore, we will use the SageMaker Python SDK to handle the upload.

[ ]:
# model package tarball (model artifact + inference code)
model_url = sagemaker_session.upload_data(path=model_archive_name, key_prefix="tfmodel")
print("model uploaded to: {}".format(model_url))

5. Create a sample payload archive

We need to create an archive that contains individual files that Inference Recommender can send to your Endpoint. Inference Recommender will randomly sample files from this archive so make sure it contains a similar distribution of payloads you’d expect in production. Note that your inference code must be able to read in the file formats from the sample payload.

Here we are only adding four images for the example. For your own use case(s), it’s recommended to add a variety of samples that is representative of your payloads.

[ ]:
payload_archive_name = "tf_payload.tar.gz"
[ ]:
## optional: download sample images
SAMPLES_BUCKET = f"sagemaker-example-files-prod-{region}"
PREFIX = "datasets/image/pets/"
payload_location = "./sample-payload/"

if not os.path.exists(payload_location):
    os.makedirs(payload_location)
    print("Directory ", payload_location, " Created ")
else:
    print("Directory ", payload_location, " already exists")

sagemaker_session.download_data(payload_location, SAMPLES_BUCKET, PREFIX)

Tar the payload

[ ]:
!cd ./sample-payload/ && tar czvf ../{payload_archive_name} *

Upload to S3

Next, we’ll upload the packaged payload examples (payload.tar.gz) that was created above to S3. The S3 location will be used as input to our Inference Recommender job later in this notebook.

[ ]:
sample_payload_url = sagemaker_session.upload_data(
    path=payload_archive_name, key_prefix="tf_payload"
)

6. Register model in Model Registry

In order to use Inference Recommender, you must have a versioned model in SageMaker Model Registry. To register a model in the Model Registry, you must have a model artifact packaged in a tarball and an inference container image. Registering a model includes the following steps:

  1. Create Model Group: This is a one-time task per machine learning use case. A Model Group contains one or more versions of your packaged model.

  2. Register Model Version/Package: This task is performed for each new packaged model version.

Container image URL

If you don’t have an inference container image, you can use one of the open source AWS Deep Learning Containers (DLCs) provided by AWS to serve your ML model. The code below retrieves a DLC based on your ML framework, framework version, python version, and instance type.

[ ]:
instance_type = "ml.c5.xlarge"  # Note: you can use any CPU-based instance type here, this is just to get a CPU tagged image
dlc_uri = image_uris.retrieve(
    framework,
    region,
    version=framework_version,
    py_version="py3",
    instance_type=instance_type,
    image_scope="inference",
)
dlc_uri

Create Model Group

[ ]:
model_package_group_name = "{}-cpu-models-".format(framework) + str(round(time.time()))
model_package_group_description = "{} models".format(ml_task.lower())

model_package_group_input_dict = {
    "ModelPackageGroupName": model_package_group_name,
    "ModelPackageGroupDescription": model_package_group_description,
}

create_model_package_group_response = sm_client.create_model_package_group(
    **model_package_group_input_dict
)
print(
    "ModelPackageGroup Arn : {}".format(create_model_package_group_response["ModelPackageGroupArn"])
)

Register Model Version/Package

In this step, you’ll register your pretrained model that was packaged in the prior steps as a new version in SageMaker Model Registry. First, you’ll configure the model package/version identifying which model package group this new model should be registered within as well as identify the initial approval status. You’ll also identify the domain and task for your model. These values were set earlier in the notebook where ml_domain = 'COMPUTER_VISION' and ml_task = 'IMAGE_CLASSIFICATION'

Note: ModelApprovalStatus is a configuration parameter that can be used in conjunction with SageMaker Projects to trigger automated deployment pipeline.

[ ]:
model_package_description = "{} {} inference recommender".format(framework, model_name)

model_approval_status = "PendingManualApproval"

create_model_package_input_dict = {
    "ModelPackageGroupName": model_package_group_name,
    "Domain": ml_domain.upper(),
    "Task": ml_task.upper(),
    "SamplePayloadUrl": sample_payload_url,
    "ModelPackageDescription": model_package_description,
    "ModelApprovalStatus": model_approval_status,
}

Set up inference specification

You’ll now setup the inference specification configuration for your model version. This contains information on how the model should be hosted.

Inference Recommender expects a single input MIME type for sending requests. Learn more about common inference data formats on SageMaker. This MIME type will be sent in the Content-Type header when invoking your endpoint.

[ ]:
input_mime_types = ["application/x-image"]

If you specify a set of instance types below (i.e. non-empty list), then Inference Recommender will only support recommendations within the set of instances below. For this example, we provide a list of common instance types used for image classification algorithms.

[ ]:
supported_realtime_inference_types = ["ml.c5.xlarge", "ml.m5.large", "ml.inf1.xlarge"]

Optional: Model optimization

Amazon SageMaker Neo is a capability of SageMaker that automatically optimizes your ML models for any target instance type. With Neo, you don’t need to set up third-party or framework-specific compiler software, or tune the model manually for optimizing inference performance.

Inference Recommender compiles your model using SageMaker Neo if the ModelInput field is provided. To prepare the inputs for model compilation, specify the input layer name and shape (NHWC format for TF) for your trained model. The dictionary format required is as follows:

For one input: {'input':[1,224,224,3]}
[ ]:
data_input_configuration = '{"input_1":[1,224,224,3]}'

If you don’t know your input layer name or shape, you can use the saved_model_cli. Learn more

[ ]:
!saved_model_cli show --dir {export_dir} --all

Now that you’ve collected all the ModelPackage details, the next step is to create the Model Version in Model Registry.

[ ]:
modelpackage_inference_specification = {
    "InferenceSpecification": {
        "Containers": [
            {
                "Image": dlc_uri,
                "Framework": framework.upper(),
                "FrameworkVersion": framework_version,
                "NearestModelName": model_name,
                "ModelInput": {"DataInputConfig": data_input_configuration},
            }
        ],
        "SupportedContentTypes": input_mime_types,  # required, must be non-null
        "SupportedResponseMIMETypes": [],
        "SupportedRealtimeInferenceInstanceTypes": supported_realtime_inference_types,  # optional
    }
}

# Specify the model data
modelpackage_inference_specification["InferenceSpecification"]["Containers"][0][
    "ModelDataUrl"
] = model_url
[ ]:
create_model_package_input_dict.update(modelpackage_inference_specification)
[ ]:
create_mode_package_response = sm_client.create_model_package(**create_model_package_input_dict)
model_package_arn = create_mode_package_response["ModelPackageArn"]
print("ModelPackage Version ARN : {}".format(model_package_arn))
[ ]:
sm_client.describe_model_package(ModelPackageName=model_package_arn)

Alternative Option: ContainerConfig

If you are missing mandatory fields to create an inference recommender job in your model package version like so (this create_model_package_input_dict is missing Domain, Task, and SamplePayloadUrl):

create_model_package_input_dict = {
    "ModelPackageGroupName": model_package_group_name,
    "ModelPackageDescription": model_package_description,
    "ModelApprovalStatus": model_approval_status,
}

You may define the fields Domain, Task, and SamplePayloadUrl in the optional field ContainerConfig like so:

payload_config = {
    "SamplePayloadUrl": sample_payload_url,
}

container_config = {
    "Domain": ml_domain.upper(),
    "Task": ml_task.upper(),
    "PayloadConfig": payload_config,
}

And then provide it directly within create_inference_recommendations_job() API like so:

default_response = client.create_inference_recommendations_job(
    JobName=str(default_job),
    JobDescription="",
    JobType="Default",
    RoleArn=role,
    InputConfig={
        "ModelPackageVersionArn": model_package_arn,
        "ContainerConfig": container_config
    },
)

For more information on what else can be provided via ContainerConfig please refer to the CreateInferenceRecommendationsJob doc here: CreateInferenceRecommendationsJob

5: Create a SageMaker Inference Recommender Default Job

Now with your model in Model Registry, you can kick off a ‘Default’ job to get instance recommendations. This only requires your ModelPackageVersionArn and comes back with recommendations within an hour.

The output is a list of instance type recommendations with associated environment variables, cost, throughput and latency metrics.

[ ]:
import boto3
import uuid
from sagemaker import get_execution_role

inference_client = boto3.client("sagemaker", region)

role = get_execution_role()
default_job = uuid.uuid1()
default_response = inference_client.create_inference_recommendations_job(
    JobName=str(default_job),
    JobDescription="",
    JobType="Default",
    RoleArn=role,
    InputConfig={"ModelPackageVersionArn": model_package_arn},
)

print(default_response)

8. Instance Recommendation Results

Each inference recommendation includes InstanceType, InitialInstanceCount, EnvironmentParameters which are tuned environment variable parameters for better performance. We also include performance and cost metrics such as MaxInvocations, ModelLatency, CostPerHour and CostPerInference. We believe these metrics will help you narrow down to a specific endpoint configuration that suits your use case.

Example:

If your motivation is overall price-performance with an emphasis on throughput, then you should focus on CostPerInference metrics
If your motivation is a balance between latency and throughput, then you should focus on ModelLatency / MaxInvocations metrics

Metric

Description

ModelLatency

The interval of time taken by a model to respond as viewed from SageMaker. This interval includes the local communication times taken to send the request and to fetch the response from the container of a model and the time taken to complete the inference in the container. Units: Milliseconds

MaximumInvocations

The maximum number of InvokeEndpoint requests sent to an endpoint per minute. Units: None

CostPerHour

The estimated cost per hour for your real-time endpoint. Units: US Dollars

CostPerInference

The estimated cost per inference for your real-time endpoint. Units: US Dollars

[ ]:
import pprint
import pandas as pd

inference_client = boto3.client("sagemaker", region)

stopped = False
while not stopped:
    inference_recommender_job = inference_client.describe_inference_recommendations_job(
        JobName=str(default_job)
    )
    if inference_recommender_job["Status"] in ["COMPLETED", "STOPPED", "FAILED"]:
        stopped = True
    else:
        print("Inference recommender job in progress")
        time.sleep(600)

if inference_recommender_job["Status"] == "FAILED":
    print("Inference recommender job failed ")
    print("Failed Reason: {}".inference_recommender_job["FailureReason"])
else:
    print("Inference recommender job completed")

Detailing out the result

[ ]:
data = [
    {**x["EndpointConfiguration"], **x["ModelConfiguration"], **x["Metrics"]}
    for x in inference_recommender_job["InferenceRecommendations"]
]
df = pd.DataFrame(data)
dropFilter = df.filter(["VariantName"])
df.drop(dropFilter, inplace=True, axis=1)
pd.set_option("max_colwidth", 400)
df.head()

Optional: ListInferenceRecommendationsJobSteps

To see the list of subtasks for an Inference Recommender job, simply provide the JobName to the ListInferenceRecommendationsJobSteps API.

To see more information for the API, please refer to the doc here: ListInferenceRecommendationsJobSteps

[ ]:
list_job_steps_response = inference_client.list_inference_recommendations_job_steps(
    JobName=str(default_job)
)
print(list_job_steps_response)

7. Custom Load Test

With an ‘Advanced’ job, you can provide your production requirements, select instance types, tune environment variables and perform more extensive load tests. This typically takes 2 hours depending on your traffic pattern and number of instance types.

The output is a list of endpoint configuration recommendations (instance type, instance count, environment variables) with associated cost, throughput and latency metrics.

In the below example, we are tuning the endpoint against an environment variable OMP_NUM_THREADS with values [1, 2, 4] and we aim to limit the latency requirement to 500 ms. The goal is to find which value for OMP_NUM_THREADS provides the best performance.

For some context, Python internally uses OpenMP for implementing multithreading within processes. The default value for OMP_NUM_THREADS is equal to the number of CPU core. However, when implemented on top of Simultaneous Multi Threading (SMT) such Intel’s HypeThreading, a certain process might oversubscribe a particular core by spawning twice the threads as the number of actual CPU cores. In certain cases, a Python binary might end up spawning up to four times the threads as available actual processor cores. Therefore, an ideal setting for this parameter, if you have oversubscribed available cores using worker threads, is 1 or half the number of CPU cores on a SMT-enabled CPU.

[ ]:
instance_type = "ml.c5.xlarge"
[ ]:
import boto3
import uuid

inference_client = boto3.client("sagemaker", region)

role = get_execution_role()
advanced_job = uuid.uuid1()
advanced_response = inference_client.create_inference_recommendations_job(
    JobName=str(advanced_job),
    JobDescription="",
    JobType="Advanced",
    RoleArn=role,
    InputConfig={
        "ModelPackageVersionArn": model_package_arn,
        "JobDurationInSeconds": 7200,
        "EndpointConfigurations": [
            {
                "InstanceType": instance_type,
                "EnvironmentParameterRanges": {
                    "CategoricalParameterRanges": [
                        {"Name": "OMP_NUM_THREADS", "Value": ["1", "2", "4"]}
                    ]
                },
            }
        ],
        "ResourceLimit": {"MaxNumberOfTests": 3, "MaxParallelOfTests": 1},
        "TrafficPattern": {
            "TrafficType": "PHASES",
            "Phases": [{"InitialNumberOfUsers": 1, "SpawnRate": 1, "DurationInSeconds": 120}],
        },
    },
    StoppingConditions={
        "MaxInvocations": 1000,
        "ModelLatencyThresholds": [{"Percentile": "P95", "ValueInMilliseconds": 500}],
    },
)

print(advanced_response)

8. Custom Load Test Results

[ ]:
import boto3
import uuid
import pprint
import pandas as pd

inference_client = boto3.client("sagemaker", region)

stopped = False
while not stopped:
    inference_recommender_job = inference_client.describe_inference_recommendations_job(
        JobName=str(advanced_job)
    )
    if inference_recommender_job["Status"] in ["COMPLETED", "STOPPED", "FAILED"]:
        stopped = True
    else:
        print("Inference recommender job in progress")
        time.sleep(600)

if inference_recommender_job["Status"] == "FAILED":
    print("Inference recommender job failed ")
    print("Failed Reason: {}".inference_recommender_job["FailureReason"])
else:
    print("Inference recommender job completed")

Detailing out the result

[ ]:
data = [
    {**x["EndpointConfiguration"], **x["ModelConfiguration"], **x["Metrics"]}
    for x in inference_recommender_job["InferenceRecommendations"]
]
df = pd.DataFrame(data)
dropFilter = df.filter(["VariantName"])
df.drop(dropFilter, inplace=True, axis=1)
pd.set_option("max_colwidth", 400)
df.head()

Notebook CI Test Results

This notebook was tested in multiple regions. The test results are as follows, except for us-west-2 which is shown at the top of the notebook.

This us-east-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This us-east-2 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This us-west-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This ca-central-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This sa-east-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This eu-west-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This eu-west-2 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This eu-west-3 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This eu-central-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This eu-north-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This ap-southeast-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This ap-southeast-2 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This ap-northeast-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This ap-northeast-2 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This ap-south-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable