SageMaker Inference Recommender
This notebook’s CI test result for us-west-2 is as follows. CI test results in other regions can be found at the end of the notebook.
1. Introduction
SageMaker Inference Recommender is a new capability of SageMaker that reduces the time required to get machine learning (ML) models in production by automating performance benchmarking and load testing models across SageMaker ML instances. You can use Inference Recommender to deploy your model to a real-time inference endpoint that delivers the best performance at the lowest cost.
Get started with Inference Recommender on SageMaker in minutes while selecting an instance and get an optimized endpoint configuration in hours, eliminating weeks of manual testing and tuning time.
2. Setup
Note that we are using the conda_tensorflow2_p36
kernel in SageMaker Notebook Instances. This is running Python 3.6 and TensorFlow 2.1.3. If you’d like to use the same setup, in the AWS Management Console, go to the Amazon SageMaker console. Choose Notebook Instances, and click create a new notebook instance. Upload the current notebook and set the kernel. You can also run this in SageMaker Studio Notebooks with the TensorFlow 2.6 Python 3.8 CPU Optimized
kernel.
In the next steps, you’ll import standard methods and libraries as well as set variables that will be used in this notebook. The get_execution_role
function retrieves the AWS Identity and Access Management (IAM) role you created at the time of creating your notebook instance.
[ ]:
!pip install --upgrade pip awscli botocore boto3 --quiet
[ ]:
from sagemaker import get_execution_role, Session, image_uris
import boto3
import time
[ ]:
region = boto3.Session().region_name
role = get_execution_role()
sm_client = boto3.client("sagemaker", region_name=region)
sagemaker_session = Session()
3. Machine learning model details
Inference Recommender uses metadata about your ML model to recommend the best instance types and endpoint configurations for deployment. You can provide as much or as little information as you’d like but the more information you provide, the better your recommendations will be.
ML Frameworks: TENSORFLOW, PYTORCH, XGBOOST, SAGEMAKER-SCIKIT-LEARN
ML Domains: COMPUTER_VISION, NATURAL_LANGUAGE_PROCESSING, MACHINE_LEARNING
Example ML Tasks: CLASSIFICATION, REGRESSION, IMAGE_CLASSIFICATION, OBJECT_DETECTION, SEGMENTATION, FILL_MASK, TEXT_CLASSIFICATION, TEXT_GENERATION, OTHER
Note: Select the task that is the closest match to your model. Chose OTHER
if none apply.
[ ]:
import tensorflow as tf
# ML framework details
framework = "tensorflow"
# Note that only the framework major and minor version is supported for Neo compilation
framework_version = ".".join(tf.__version__.split(".")[:-1])
# model name as standardized by model zoos or a similar open source model
model_name = "resnet50"
# ML model details
ml_domain = "COMPUTER_VISION"
ml_task = "IMAGE_CLASSIFICATION"
print("TF Version", framework_version)
4. Create a model archive
SageMaker models need to be packaged in .tar.gz
files. When your SageMaker Endpoint is provisioned, the files in the archive will be extracted and put in /opt/ml/model/
on the Endpoint.
In this step, there are two optional tasks to:
Download a pretrained model from Keras applications
Download a sample inference script (inference.py) from S3
These tasks are provided as a sample reference but can and should be modified when using your own trained models with Inference Recommender.
Optional: Download model from Keras applications
Let’s download the model from Keras applications. By setting the variable download_the_model=False, you can skip the download and provide your own model archive.
[ ]:
download_the_model = True
[ ]:
import os
import tensorflow as tf
from tensorflow.keras.applications.resnet50 import ResNet50
from tensorflow.keras import backend
[ ]:
if download_the_model:
tf.keras.backend.set_learning_phase(0)
input_tensor = tf.keras.Input(name="input_1", shape=(224, 224, 3))
model = tf.keras.applications.resnet50.ResNet50(input_tensor=input_tensor)
# Creating the directory strcture
model_version = "1"
export_dir = "./model/" + model_version
if not os.path.exists(export_dir):
os.makedirs(export_dir)
print("Directory ", export_dir, " Created ")
else:
print("Directory ", export_dir, " already exists")
# Save to SavedModel
model.save(export_dir, save_format="tf", include_optimizer=False)
[ ]:
os.makedirs("code")
[ ]:
%%writefile code/inference.py
import io
import json
import numpy as np
from PIL import Image
IMAGE_SIZE = (224, 224)
def input_handler(data, context):
"""Pre-process request input before it is sent to TensorFlow Serving REST API
https://github.com/aws/amazon-sagemaker-examples/blob/0e57a288f54910a50dcbe3dfe2acb8d62e3b3409/sagemaker-python-sdk/tensorflow_serving_container/sample_utils.py#L61
Args:
data (obj): the request data stream
context (Context): an object containing request and configuration details
Returns:
(dict): a JSON-serializable dict that contains request body and headers
"""
if context.request_content_type == "application/x-image":
buf = np.fromstring(data.read(), np.uint8)
image = Image.open(io.BytesIO(buf)).resize(IMAGE_SIZE)
image = np.array(image)
image = np.expand_dims(image, axis=0)
return json.dumps({"instances": image.tolist()})
else:
_return_error(
415, 'Unsupported content type "{}"'.format(context.request_content_type or "Unknown")
)
def output_handler(response, context):
"""Post-process TensorFlow Serving output before it is returned to the client.
Args:
response (obj): the TensorFlow serving response
context (Context): an object containing request and configuration details
Returns:
(bytes, string): data to return to client, response content type
"""
if response.status_code != 200:
_return_error(response.status_code, response.content.decode("utf-8"))
response_content_type = context.accept_header
prediction = response.content
return prediction, response_content_type
def _return_error(code, message):
raise ValueError("Error: {}, {}".format(str(code), message))
[ ]:
%%writefile code/requirements.txt
numpy
pillow
Create a tarball
To bring your own TensorFlow model, SageMaker expects a single archive file in .tar.gz format, containing a model file (.pb) in TF SavedModel format and the script (.py) for inference.
[ ]:
model_archive_name = "tfmodel.tar.gz"
[ ]:
!tar -cvpzf {model_archive_name} ./model ./code
Upload to S3
We now have a model archive ready. We need to upload it to S3 before we can use with Inference Recommender. Furthermore, we will use the SageMaker Python SDK to handle the upload.
[ ]:
# model package tarball (model artifact + inference code)
model_url = sagemaker_session.upload_data(path=model_archive_name, key_prefix="tfmodel")
print("model uploaded to: {}".format(model_url))
5. Create a sample payload archive
We need to create an archive that contains individual files that Inference Recommender can send to your Endpoint. Inference Recommender will randomly sample files from this archive so make sure it contains a similar distribution of payloads you’d expect in production. Note that your inference code must be able to read in the file formats from the sample payload.
Here we are only adding four images for the example. For your own use case(s), it’s recommended to add a variety of samples that is representative of your payloads.
[ ]:
payload_archive_name = "tf_payload.tar.gz"
[ ]:
## optional: download sample images
SAMPLES_BUCKET = f"sagemaker-example-files-prod-{region}"
PREFIX = "datasets/image/pets/"
payload_location = "./sample-payload/"
if not os.path.exists(payload_location):
os.makedirs(payload_location)
print("Directory ", payload_location, " Created ")
else:
print("Directory ", payload_location, " already exists")
sagemaker_session.download_data(payload_location, SAMPLES_BUCKET, PREFIX)
Tar the payload
[ ]:
!cd ./sample-payload/ && tar czvf ../{payload_archive_name} *
Upload to S3
Next, we’ll upload the packaged payload examples (payload.tar.gz) that was created above to S3. The S3 location will be used as input to our Inference Recommender job later in this notebook.
[ ]:
sample_payload_url = sagemaker_session.upload_data(
path=payload_archive_name, key_prefix="tf_payload"
)
6. Register model in Model Registry
In order to use Inference Recommender, you must have a versioned model in SageMaker Model Registry. To register a model in the Model Registry, you must have a model artifact packaged in a tarball and an inference container image. Registering a model includes the following steps:
Create Model Group: This is a one-time task per machine learning use case. A Model Group contains one or more versions of your packaged model.
Register Model Version/Package: This task is performed for each new packaged model version.
Container image URL
If you don’t have an inference container image, you can use one of the open source AWS Deep Learning Containers (DLCs) provided by AWS to serve your ML model. The code below retrieves a DLC based on your ML framework, framework version, python version, and instance type.
[ ]:
instance_type = "ml.c5.xlarge" # Note: you can use any CPU-based instance type here, this is just to get a CPU tagged image
dlc_uri = image_uris.retrieve(
framework,
region,
version=framework_version,
py_version="py3",
instance_type=instance_type,
image_scope="inference",
)
dlc_uri
Create Model Group
[ ]:
model_package_group_name = "{}-cpu-models-".format(framework) + str(round(time.time()))
model_package_group_description = "{} models".format(ml_task.lower())
model_package_group_input_dict = {
"ModelPackageGroupName": model_package_group_name,
"ModelPackageGroupDescription": model_package_group_description,
}
create_model_package_group_response = sm_client.create_model_package_group(
**model_package_group_input_dict
)
print(
"ModelPackageGroup Arn : {}".format(create_model_package_group_response["ModelPackageGroupArn"])
)
Register Model Version/Package
In this step, you’ll register your pretrained model that was packaged in the prior steps as a new version in SageMaker Model Registry. First, you’ll configure the model package/version identifying which model package group this new model should be registered within as well as identify the initial approval status. You’ll also identify the domain and task for your model. These values were set earlier in the notebook where ml_domain = 'COMPUTER_VISION'
and ml_task = 'IMAGE_CLASSIFICATION'
Note: ModelApprovalStatus is a configuration parameter that can be used in conjunction with SageMaker Projects to trigger automated deployment pipeline.
[ ]:
model_package_description = "{} {} inference recommender".format(framework, model_name)
model_approval_status = "PendingManualApproval"
create_model_package_input_dict = {
"ModelPackageGroupName": model_package_group_name,
"Domain": ml_domain.upper(),
"Task": ml_task.upper(),
"SamplePayloadUrl": sample_payload_url,
"ModelPackageDescription": model_package_description,
"ModelApprovalStatus": model_approval_status,
}
Set up inference specification
You’ll now setup the inference specification configuration for your model version. This contains information on how the model should be hosted.
Inference Recommender expects a single input MIME type for sending requests. Learn more about common inference data formats on SageMaker. This MIME type will be sent in the Content-Type header when invoking your endpoint.
[ ]:
input_mime_types = ["application/x-image"]
If you specify a set of instance types below (i.e. non-empty list), then Inference Recommender will only support recommendations within the set of instances below. For this example, we provide a list of common instance types used for image classification algorithms.
[ ]:
supported_realtime_inference_types = ["ml.c5.xlarge", "ml.m5.large", "ml.inf1.xlarge"]
Optional: Model optimization
Amazon SageMaker Neo is a capability of SageMaker that automatically optimizes your ML models for any target instance type. With Neo, you don’t need to set up third-party or framework-specific compiler software, or tune the model manually for optimizing inference performance.
Inference Recommender compiles your model using SageMaker Neo if the ModelInput
field is provided. To prepare the inputs for model compilation, specify the input layer name and shape (NHWC format for TF) for your trained model. The dictionary format required is as follows:
For one input: {'input':[1,224,224,3]}
[ ]:
data_input_configuration = '{"input_1":[1,224,224,3]}'
If you don’t know your input layer name or shape, you can use the saved_model_cli
. Learn more
[ ]:
!saved_model_cli show --dir {export_dir} --all
Now that you’ve collected all the ModelPackage details, the next step is to create the Model Version in Model Registry.
[ ]:
modelpackage_inference_specification = {
"InferenceSpecification": {
"Containers": [
{
"Image": dlc_uri,
"Framework": framework.upper(),
"FrameworkVersion": framework_version,
"NearestModelName": model_name,
"ModelInput": {"DataInputConfig": data_input_configuration},
}
],
"SupportedContentTypes": input_mime_types, # required, must be non-null
"SupportedResponseMIMETypes": [],
"SupportedRealtimeInferenceInstanceTypes": supported_realtime_inference_types, # optional
}
}
# Specify the model data
modelpackage_inference_specification["InferenceSpecification"]["Containers"][0][
"ModelDataUrl"
] = model_url
[ ]:
create_model_package_input_dict.update(modelpackage_inference_specification)
[ ]:
create_mode_package_response = sm_client.create_model_package(**create_model_package_input_dict)
model_package_arn = create_mode_package_response["ModelPackageArn"]
print("ModelPackage Version ARN : {}".format(model_package_arn))
[ ]:
sm_client.describe_model_package(ModelPackageName=model_package_arn)
Alternative Option: ContainerConfig
If you are missing mandatory fields to create an inference recommender job in your model package version like so (this create_model_package_input_dict
is missing Domain
, Task
, and SamplePayloadUrl
):
create_model_package_input_dict = {
"ModelPackageGroupName": model_package_group_name,
"ModelPackageDescription": model_package_description,
"ModelApprovalStatus": model_approval_status,
}
You may define the fields Domain
, Task
, and SamplePayloadUrl
in the optional field ContainerConfig
like so:
payload_config = {
"SamplePayloadUrl": sample_payload_url,
}
container_config = {
"Domain": ml_domain.upper(),
"Task": ml_task.upper(),
"PayloadConfig": payload_config,
}
And then provide it directly within create_inference_recommendations_job()
API like so:
default_response = client.create_inference_recommendations_job(
JobName=str(default_job),
JobDescription="",
JobType="Default",
RoleArn=role,
InputConfig={
"ModelPackageVersionArn": model_package_arn,
"ContainerConfig": container_config
},
)
For more information on what else can be provided via ContainerConfig
please refer to the CreateInferenceRecommendationsJob
doc here: CreateInferenceRecommendationsJob
5: Create a SageMaker Inference Recommender Default Job
Now with your model in Model Registry, you can kick off a ‘Default’ job to get instance recommendations. This only requires your ModelPackageVersionArn
and comes back with recommendations within an hour.
The output is a list of instance type recommendations with associated environment variables, cost, throughput and latency metrics.
[ ]:
import boto3
import uuid
from sagemaker import get_execution_role
inference_client = boto3.client("sagemaker", region)
role = get_execution_role()
default_job = uuid.uuid1()
default_response = inference_client.create_inference_recommendations_job(
JobName=str(default_job),
JobDescription="",
JobType="Default",
RoleArn=role,
InputConfig={"ModelPackageVersionArn": model_package_arn},
)
print(default_response)
8. Instance Recommendation Results
Each inference recommendation includes InstanceType
, InitialInstanceCount
, EnvironmentParameters
which are tuned environment variable parameters for better performance. We also include performance and cost metrics such as MaxInvocations
, ModelLatency
, CostPerHour
and CostPerInference
. We believe these metrics will help you narrow down to a specific endpoint configuration that suits your use case.
Example:
CostPerInference
metricsModelLatency
/ MaxInvocations
metricsMetric |
Description |
---|---|
ModelLatency |
The interval of time taken by a model to respond as viewed from SageMaker. This interval includes the local communication times taken to send the request and to fetch the response from the container of a model and the time taken to complete the inference in the container. Units: Milliseconds |
MaximumInvocations |
The maximum number of InvokeEndpoint requests sent to an endpoint per minute. Units: None |
CostPerHour |
The estimated cost per hour for your real-time endpoint. Units: US Dollars |
CostPerInference |
The estimated cost per inference for your real-time endpoint. Units: US Dollars |
[ ]:
import pprint
import pandas as pd
inference_client = boto3.client("sagemaker", region)
stopped = False
while not stopped:
inference_recommender_job = inference_client.describe_inference_recommendations_job(
JobName=str(default_job)
)
if inference_recommender_job["Status"] in ["COMPLETED", "STOPPED", "FAILED"]:
stopped = True
else:
print("Inference recommender job in progress")
time.sleep(600)
if inference_recommender_job["Status"] == "FAILED":
print("Inference recommender job failed ")
print("Failed Reason: {}".inference_recommender_job["FailureReason"])
else:
print("Inference recommender job completed")
Detailing out the result
[ ]:
data = [
{**x["EndpointConfiguration"], **x["ModelConfiguration"], **x["Metrics"]}
for x in inference_recommender_job["InferenceRecommendations"]
]
df = pd.DataFrame(data)
dropFilter = df.filter(["VariantName"])
df.drop(dropFilter, inplace=True, axis=1)
pd.set_option("max_colwidth", 400)
df.head()
Optional: ListInferenceRecommendationsJobSteps
To see the list of subtasks for an Inference Recommender job, simply provide the JobName
to the ListInferenceRecommendationsJobSteps
API.
To see more information for the API, please refer to the doc here: ListInferenceRecommendationsJobSteps
[ ]:
list_job_steps_response = inference_client.list_inference_recommendations_job_steps(
JobName=str(default_job)
)
print(list_job_steps_response)
7. Custom Load Test
With an ‘Advanced’ job, you can provide your production requirements, select instance types, tune environment variables and perform more extensive load tests. This typically takes 2 hours depending on your traffic pattern and number of instance types.
The output is a list of endpoint configuration recommendations (instance type, instance count, environment variables) with associated cost, throughput and latency metrics.
In the below example, we are tuning the endpoint against an environment variable OMP_NUM_THREADS
with values [1, 2, 4]
and we aim to limit the latency requirement to 500
ms. The goal is to find which value for OMP_NUM_THREADS
provides the best performance.
For some context, Python internally uses OpenMP for implementing multithreading within processes. The default value for OMP_NUM_THREADS
is equal to the number of CPU core. However, when implemented on top of Simultaneous Multi Threading (SMT) such Intel’s HypeThreading, a certain process might oversubscribe a particular core by spawning twice the threads as the number of actual CPU cores. In certain cases, a Python binary might end up spawning up to four times the threads as available actual
processor cores. Therefore, an ideal setting for this parameter, if you have oversubscribed available cores using worker threads, is 1 or half the number of CPU cores on a SMT-enabled CPU.
[ ]:
instance_type = "ml.c5.xlarge"
[ ]:
import boto3
import uuid
inference_client = boto3.client("sagemaker", region)
role = get_execution_role()
advanced_job = uuid.uuid1()
advanced_response = inference_client.create_inference_recommendations_job(
JobName=str(advanced_job),
JobDescription="",
JobType="Advanced",
RoleArn=role,
InputConfig={
"ModelPackageVersionArn": model_package_arn,
"JobDurationInSeconds": 7200,
"EndpointConfigurations": [
{
"InstanceType": instance_type,
"EnvironmentParameterRanges": {
"CategoricalParameterRanges": [
{"Name": "OMP_NUM_THREADS", "Value": ["1", "2", "4"]}
]
},
}
],
"ResourceLimit": {"MaxNumberOfTests": 3, "MaxParallelOfTests": 1},
"TrafficPattern": {
"TrafficType": "PHASES",
"Phases": [{"InitialNumberOfUsers": 1, "SpawnRate": 1, "DurationInSeconds": 120}],
},
},
StoppingConditions={
"MaxInvocations": 1000,
"ModelLatencyThresholds": [{"Percentile": "P95", "ValueInMilliseconds": 500}],
},
)
print(advanced_response)
8. Custom Load Test Results
[ ]:
import boto3
import uuid
import pprint
import pandas as pd
inference_client = boto3.client("sagemaker", region)
stopped = False
while not stopped:
inference_recommender_job = inference_client.describe_inference_recommendations_job(
JobName=str(advanced_job)
)
if inference_recommender_job["Status"] in ["COMPLETED", "STOPPED", "FAILED"]:
stopped = True
else:
print("Inference recommender job in progress")
time.sleep(600)
if inference_recommender_job["Status"] == "FAILED":
print("Inference recommender job failed ")
print("Failed Reason: {}".inference_recommender_job["FailureReason"])
else:
print("Inference recommender job completed")
Detailing out the result
[ ]:
data = [
{**x["EndpointConfiguration"], **x["ModelConfiguration"], **x["Metrics"]}
for x in inference_recommender_job["InferenceRecommendations"]
]
df = pd.DataFrame(data)
dropFilter = df.filter(["VariantName"])
df.drop(dropFilter, inplace=True, axis=1)
pd.set_option("max_colwidth", 400)
df.head()
Notebook CI Test Results
This notebook was tested in multiple regions. The test results are as follows, except for us-west-2 which is shown at the top of the notebook.