SageMaker Inference Recommender - CloudWatch Metrics


This notebook’s CI test result for us-west-2 is as follows. CI test results in other regions can be found at the end of the notebook.

This us-west-2 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable


1. Introduction

SageMaker Inference Recommender is a new capability of SageMaker that reduces the time required to get machine learning (ML) models in production by automating load tests and optimizing model performance across instance types. You can use Inference Recommender to select a real-time inference endpoint that delivers the best performance at the lowest cost.

Get started with Inference Recommender on SageMaker in minutes while selecting an instance and get an optimized endpoint configuration in hours, eliminating weeks of manual testing and tuning time.

In this notebook, we will walk you through getting recommendations for a TensorFlow model and using information from Inference Recommender to pull additional metrics from CloudWatch.

2. Setup

Note that we are using the conda_tensorflow2_p36 kernel in SageMaker Notebook Instances. This is running Python 3.6 and TensorFlow 2.1.3. If you’d like to use the same setup, in the AWS Management Console, go to the Amazon SageMaker console. Choose Notebook Instances, and click create a new notebook instance. Upload the current notebook and set the kernel. You can also run this in SageMaker Studio Notebooks with the TensorFlow 2.1 Python 3.6 CPU Optimized kernel.

In the next steps, you’ll import standard methods and libraries as well as set variables that will be used in this notebook. The get_execution_role function retrieves the AWS Identity and Access Management (IAM) role you created at the time of creating your notebook instance.

[ ]:
!pip install --upgrade pip awscli botocore boto3  --quiet
[ ]:
from sagemaker import get_execution_role, Session, image_uris
import boto3
import time
[ ]:
region = boto3.Session().region_name
role = get_execution_role()
sm_client = boto3.client("sagemaker", region_name=region)
sagemaker_session = Session()
print(region)

3. Machine learning model details

Inference Recommender uses metadata about your ML model to recommend the best instance types and endpoint configurations for deployment. You can provide as much or as little information as you’d like but the more information you provide, the better your recommendations will be.

ML Frameworks: TENSORFLOW, PYTORCH, XGBOOST, SAGEMAKER-SCIKIT-LEARN

ML Domains: COMPUTER_VISION, NATURAL_LANGUAGE_PROCESSING, MACHINE_LEARNING

Example ML Tasks: CLASSIFICATION, REGRESSION, IMAGE_CLASSIFICATION, OBJECT_DETECTION, SEGMENTATION, MASK_FILL, TEXT_CLASSIFICATION, TEXT_GENERATION, OTHER

Note: Select the task that is the closest match to your model. Chose OTHER if none apply.

[ ]:
import tensorflow as tf

# ML framework details
framework = "tensorflow"
framework_version = tf.__version__

# model name as standardized by model zoos or a similar open source model
model_name = "resnet50"

# ML model details
ml_domain = "COMPUTER_VISION"
ml_task = "IMAGE_CLASSIFICATION"

print("TF Version", framework_version)

4. Create a model archive

SageMaker models need to be packaged in .tar.gz files. When your SageMaker Endpoint is provisioned, the files in the archive will be extracted and put in /opt/ml/model/ on the Endpoint.

In this step, there are two optional tasks to:

  1. Download a pretrained model from Keras applications

  2. Download a sample inference script (inference.py) from S3

These tasks are provided as a sample reference but can and should be modified when using your own trained models with Inference Recommender.

Optional: Download model from Keras applications

Let’s download the model from Keras applications. By setting the variable download_the_model=False, you can skip the download and provide your own model archive.

[ ]:
download_the_model = True
[ ]:
import os
import tensorflow as tf
from tensorflow.keras.applications.resnet50 import ResNet50
from tensorflow.keras import backend

if download_the_model:
    tf.keras.backend.set_learning_phase(0)
    model = tf.keras.applications.ResNet50()

    # Creating the directory strcture
    model_version = "1"
    export_dir = "./model/" + model_version
    if not os.path.exists(export_dir):
        os.makedirs(export_dir)
        print("Directory ", export_dir, " Created ")
    else:
        print("Directory ", export_dir, " already exists")
    # Save to SavedModel
    model.save(export_dir, save_format="tf", include_optimizer=False)

Create a tarball

To bring your own TensorFlow model, SageMaker expects a single archive file in .tar.gz format, containing a model file (.pb) in TF SavedModel format and the script (.py) for inference.

[ ]:
model_archive_name = "tf-model.tar.gz"
[ ]:
!tar -cvpzf {model_archive_name} ./model ./code

Upload to S3

We now have a model archive ready. We need to upload it to S3 before we can use with Inference Recommender. Furthermore, we will use the SageMaker Python SDK to handle the upload.

[ ]:
# model package tarball (model artifact + inference code)
model_url = sagemaker_session.upload_data(path=model_archive_name, key_prefix="tfmodel")
print("model uploaded to: {}".format(model_url))

5. Create a sample payload archive

We need to create an archive that contains individual files that Inference Recommender can send to your Endpoint. Inference Recommender will randomly sample files from this archive so make sure it contains a similar distribution of payloads you’d expect in production. Note that your inference code must be able to read in the file formats from the sample payload.

Here we are only adding four images for the example. For your own use case(s), it’s recommended to add a variety of samples that is representative of your payloads.

[ ]:
payload_archive_name = "tf_cw_payload.tar.gz"
[ ]:
## optional: download sample images
SAMPLES_BUCKET = f"sagemaker-example-files-prod-{region}"
PREFIX = "datasets/image/pets/"
payload_location = "./sample-payload/"

if not os.path.exists(payload_location):
    os.makedirs(payload_location)
    print("Directory ", payload_location, " Created ")
else:
    print("Directory ", payload_location, " already exists")

sagemaker_session.download_data(payload_location, SAMPLES_BUCKET, PREFIX)

Create a tarball

[ ]:
!cd ./sample-payload/ && tar czvf ../{payload_archive_name} *

Upload to S3

Next, we’ll upload the packaged payload examples (payload.tar.gz) that was created above to S3. The S3 location will be used as input to our Inference Recommender job later in this notebook.

[ ]:
sample_payload_url = sagemaker_session.upload_data(
    path=payload_archive_name, key_prefix="tf_cw_payload"
)

6. Register model in Model Registry

In order to use Inference Recommender, you must have a versioned model in SageMaker Model Registry. To register a model in the Model Registry, you must have a model artifact packaged in a tarball and an inference container image. Registering a model includes the following steps:

  1. Create Model Group: This is a one-time task per machine learning use case. A Model Group contains one or more versions of your packaged model.

  2. Register Model Version/Package: This task is performed for each new packaged model version.

Container image URL

If you don’t have an inference container image, you can use one of the open source AWS Deep Learning Containers (DLCs) provided by AWS to serve your ML model. The code below retrieves a DLC based on your ML framework, framework version, python version, and instance type.

[ ]:
instance_type = "ml.c5.xlarge"  # Note: you can use any CPU-based instance type here, this is just to get a CPU tagged image
dlc_uri = image_uris.retrieve(
    framework,
    region,
    version=framework_version,
    py_version="py3",
    instance_type=instance_type,
    image_scope="inference",
)
dlc_uri

Create Model Group

[ ]:
model_package_group_name = "{}-cpu-models-".format(framework) + str(round(time.time()))
model_package_group_description = "{} models".format(ml_task.lower())

model_package_group_input_dict = {
    "ModelPackageGroupName": model_package_group_name,
    "ModelPackageGroupDescription": model_package_group_description,
}

create_model_package_group_response = sm_client.create_model_package_group(
    **model_package_group_input_dict
)
print(
    "ModelPackageGroup Arn : {}".format(create_model_package_group_response["ModelPackageGroupArn"])
)

Register Model Version/Package

In this step, you’ll register your pretrained model that was packaged in the prior steps as a new version in SageMaker Model Registry. First, you’ll configure the model package/version identifying which model package group this new model should be registered within as well as identify the initial approval status. You’ll also identify the domain and task for your model. These values were set earlier in the notebook where ml_domain = 'COMPUTER_VISION' and ml_task = 'IMAGE_CLASSIFICATION'

Note: ModelApprovalStatus is a configuration parameter that can be used in conjunction with SageMaker Projects to trigger automated deployment pipeline.

[ ]:
model_package_description = "{} {} inference recommender".format(framework, model_name)

model_approval_status = "PendingManualApproval"

create_model_package_input_dict = {
    "ModelPackageGroupName": model_package_group_name,
    "Domain": ml_domain.upper(),
    "Task": ml_task.upper(),
    "SamplePayloadUrl": sample_payload_url,
    "ModelPackageDescription": model_package_description,
    "ModelApprovalStatus": model_approval_status,
}

Set up inference specification

You’ll now setup the inference specification configuration for your model version. This contains information on how the model should be hosted.

Inference Recommender expects a single input MIME type for sending requests. Learn more about common inference data formats on SageMaker. This MIME type will be sent in the Content-Type header when invoking your endpoint.

[ ]:
input_mime_types = ["application/x-image"]

If you specify a set of instance types below (i.e. non-empty list), then Inference Recommender will only support recommendations within the set of instances below. For this example, we provide a list of common instance types used for image classification algorithms.

[ ]:
supported_realtime_inference_types = ["ml.c5.xlarge", "ml.m5.xlarge", "ml.c5d.large", "ml.m5.large"]
[ ]:
modelpackage_inference_specification = {
    "InferenceSpecification": {
        "Containers": [
            {
                "Image": dlc_uri,
                "Framework": framework.upper(),
                "FrameworkVersion": framework_version,
                "NearestModelName": model_name,
            }
        ],
        "SupportedContentTypes": input_mime_types,  # required, must be non-null
        "SupportedResponseMIMETypes": [],
        "SupportedRealtimeInferenceInstanceTypes": supported_realtime_inference_types,  # optional
    }
}

# Specify the model data
modelpackage_inference_specification["InferenceSpecification"]["Containers"][0][
    "ModelDataUrl"
] = model_url

Now that you’ve configured the model package, the next step is to create the model package/version in SageMaker Model Registry

[ ]:
create_model_package_input_dict.update(modelpackage_inference_specification)
[ ]:
create_mode_package_response = sm_client.create_model_package(**create_model_package_input_dict)
model_package_arn = create_mode_package_response["ModelPackageArn"]
print("ModelPackage Version ARN : {}".format(model_package_arn))

Alternative Option: ContainerConfig

If you are missing mandatory fields to create an inference recommender job in your model package version like so (this create_model_package_input_dict is missing Domain, Task, and SamplePayloadUrl):

create_model_package_input_dict = {
    "ModelPackageGroupName": model_package_group_name,
    "ModelPackageDescription": model_package_description,
    "ModelApprovalStatus": model_approval_status,
}

You may define the fields Domain, Task, and SamplePayloadUrl in the optional field ContainerConfig like so:

payload_config = {
    "SamplePayloadUrl": sample_payload_url,
}

container_config = {
    "Domain": ml_domain.upper(),
    "Task": ml_task.upper(),
    "PayloadConfig": payload_config,
}

And then provide it directly within create_inference_recommendations_job() API like so:

default_response = client.create_inference_recommendations_job(
    JobName=str(default_job),
    JobDescription="",
    JobType="Default",
    RoleArn=role,
    InputConfig={
        "ModelPackageVersionArn": model_package_arn,
        "ContainerConfig": container_config
    },
)

For more information on what else can be provided via ContainerConfig please refer to the CreateInferenceRecommendationsJob doc here: CreateInferenceRecommendationsJob

7. Create an Inference Recommender Default Job

Now with your model in Model Registry, you can kick off a ‘Default’ job to get instance recommendations. This only requires your ModelPackageVersionArn and comes back with recommendations within 45 minutes.

The output is a list of instance type recommendations with associated environment variables, cost, throughput and latency metrics.

[ ]:
default_job_name = model_name + "-instance-" + str(round(time.time()))
job_description = "{} {}".format(framework, model_name)
job_type = "Default"
print("job_name:", default_job_name)
[ ]:
rv = sm_client.create_inference_recommendations_job(
    JobName=default_job_name,
    JobDescription=job_description,  # optional
    JobType=job_type,
    RoleArn=role,
    InputConfig={"ModelPackageVersionArn": model_package_arn},
)

print(rv)

8. Instance Recommendation Results

Each inference recommendation includes InstanceType, InitialInstanceCount, EnvironmentParameters which are tuned environment variable parameters for better performance. We also include performance and cost metrics such as MaxInvocations, ModelLatency, CostPerHour and CostPerInference. We believe these metrics will help you narrow down to a specific endpoint configuration that suits your use case.

Example:

If your motivation is overall price-performance with an emphasis on throughput, then you should focus on CostPerInference metrics
If your motivation is a balance between latency and throughput, then you should focus on ModelLatency / MaxInvocations metrics

Metric

Description

ModelLatency

The interval of time taken by a model to respond as viewed from SageMaker. This interval includes the local communication times taken to send the request and to fetch the response from the container of a model and the time taken to complete the inference in the container. Units: Microseconds

MaximumInvocations

The maximum number of InvokeEndpoint requests sent to a model endpoint. Units: None

CostPerHour

The estimated cost per hour for your real-time endpoint. Units: US Dollars

CostPerInference

The estimated cost per inference for your real-time endpoint. Units: US Dollars

[ ]:
import pprint
import pandas as pd

finished = False
while not finished:
    inference_recommender_job = sm_client.describe_inference_recommendations_job(
        JobName=default_job_name
    )
    if inference_recommender_job["Status"] in ["COMPLETED", "STOPPED", "FAILED"]:
        finished = True
    else:
        print("In progress")
        time.sleep(300)

if inference_recommender_job["Status"] == "FAILED":
    print("Inference recommender job failed ")
    print("Failed Reason: {}".format(inference_recommender_job["FailureReason"]))
else:
    print("Inference recommender job completed")
[ ]:
data = [
    {**x["EndpointConfiguration"], **x["ModelConfiguration"], **x["Metrics"]}
    for x in inference_recommender_job["InferenceRecommendations"]
]
df = pd.DataFrame(data)
dropFilter = df.filter(["VariantName"])
df.drop(dropFilter, inplace=True, axis=1)
pd.set_option("max_colwidth", 400)
df.head()

Optional: ListInferenceRecommendationsJobSteps

To see the list of subtasks for an Inference Recommender job, simply provide the JobName to the ListInferenceRecommendationsJobSteps API.

To see more information for the API, please refer to the doc here: ListInferenceRecommendationsJobSteps

[ ]:
list_job_steps_response = sm_client.list_inference_recommendations_job_steps(
    JobName=default_job_name
)
print(list_job_steps_response)

9. Custom Load Test

With an ‘Advanced’ job, you can provide your production requirements, select instance types, tune environment variables and perform more extensive load tests. This typically takes 2 hours depending on your traffic pattern and number of instance types.

The output is a list of endpoint configuration recommendations (instance type, instance count, environment variables) with associated cost, throughput and latency metrics.

In the below example, we are tuning the endpoint against an environment variable OMP_NUM_THREADS with three values [2, 4, 10] and we aim to have a 95th percentile latency to be no more than 100 ms.

[ ]:
job_type = "Advanced"
advanced_job_name = (
    model_name + "-" + instance_type.replace(".", "-") + "-load-test-" + str(round(time.time()))
)
advanced_job_description = "{} {} on {}".format(framework, model_name, instance_type)

Instance configurations

[ ]:
instance_type = "ml.c5.xlarge"
[ ]:
env_parameter_ranges = [{"Name": "OMP_NUM_THREADS", "Value": ["2", "4", "10"]}]

Stopping conditions

[ ]:
# P95 latency (ms)
p95_model_latency = 100
# throughput
max_invocations = 300

Traffic pattern

[ ]:
traffic_pattern_phases = [{"InitialNumberOfUsers": 1, "SpawnRate": 1, "DurationInSeconds": 120}]

Resource limits

[ ]:
# total duration of Advanced job in seconds
job_duration = 7200
# Total number of endpoint configuration should be less than or equal to MaxNumberOfTests
max_number_of_tests = 10
max_parallel_tests = 1
[ ]:
advanced_response = sm_client.create_inference_recommendations_job(
    JobName=advanced_job_name,
    JobDescription=advanced_job_description,
    JobType=job_type,
    RoleArn=role,
    InputConfig={
        "ModelPackageVersionArn": model_package_arn,
        "JobDurationInSeconds": job_duration,
        "EndpointConfigurations": [
            {
                "InstanceType": instance_type,
                "EnvironmentParameterRanges": {"CategoricalParameterRanges": env_parameter_ranges},
            }
        ],
        "ResourceLimit": {
            "MaxNumberOfTests": max_number_of_tests,
            "MaxParallelOfTests": max_parallel_tests,
        },
        "TrafficPattern": {"TrafficType": "PHASES", "Phases": traffic_pattern_phases},
    },
    StoppingConditions={
        "MaxInvocations": max_invocations,
        "ModelLatencyThresholds": [{"Percentile": "P95", "ValueInMilliseconds": p95_model_latency}],
    },
)

print(advanced_response)

10. Custom Load Test Results

Inference Recommender benchmarks both endpoint configurations. Analyzing the load test result we find,

OMP_NUM_THREADS = 2 shows ~20% better throughput when compared to OMP_NUM_THREADS = 4
OMP_NUM_THREADS = 2 shows 25% saving in inference-cost when compared to OMP_NUM_THREADS = 4

Overall, OMP_NUM_THREADS = 2 is much better endpoint configuration than OMP_NUM_THREADS = 4

[ ]:
finished = False
while not finished:
    inference_recommender_job = sm_client.describe_inference_recommendations_job(
        JobName=advanced_job_name
    )
    if inference_recommender_job["Status"] in ["COMPLETED", "STOPPED", "FAILED"]:
        finished = True
    else:
        print("In progress")
        time.sleep(300)

if inference_recommender_job["Status"] == "FAILED":
    print("Inference recommender job failed")
    print("Failure Reason: {}".format(inference_recommender_job["FailureReason"]))
else:
    print("Inference recommender job completed")
[ ]:
data = [
    {**x["EndpointConfiguration"], **x["ModelConfiguration"], **x["Metrics"]}
    for x in inference_recommender_job["InferenceRecommendations"]
]
df = pd.DataFrame(data)
dropFilter = df.filter(["VariantName"])
df.drop(dropFilter, inplace=True, axis=1)
pd.set_option("max_colwidth", 400)
df.head()

11. Review detailed CloudWatch metrics

Below we’ve provided a helper script cloudwatch.py that you can import to get access to functions for reviewing CW metrics. Just by providing your region and Inference Recommender job name you can get detailed charts of your endpoints during the load test.

Metric

Description

OverheadLatency

The interval of time added to the time taken to respond to a client request by SageMaker overheads. This interval is measured from the time SageMaker receives the request until it returns a response to the client, minus the ModelLatency. Overhead latency can vary depending on multiple factors, including request and response payload sizes, request frequency, and authentication/authorization of the request. Units: Microseconds

CPUUtilization

The sum of each individual CPU core’s utilization. The CPU utilization of each core range is 0–100. For example, if there are four CPUs, the CPUUtilization range is 0%–400%. For processing jobs, the value is the CPU utilization of the processing container on the instance. For training jobs, the value is the CPU utilization of the algorithm container on the instance. For batch transform jobs, the value is the CPU utilization of the transform container on the instance. For endpoint variants, the value is the sum of the CPU utilization of the primary and supplementary containers on the instance. Note: For multi-instance jobs, each instance reports CPU utilization metrics. However, the default view in CloudWatch shows the average CPU utilization across all instances. Units: Percent

MemoryUtilization

The percentage of memory that is used by the containers on an instance. This value range is 0%–100%. For processing jobs, the value is the memory utilization of the processing container on the instance. For training jobs, the value is the memory utilization of the algorithm container on the instance. For batch transform jobs, the value is the memory utilization of the transform container on the instance. For endpoint variants, the value is the sum of the memory utilization of the primary and supplementary containers on the instance. Units: Percent

Learn more about CloudWatch: https://docs.aws.amazon.com/sagemaker/latest/dg/monitoring-cloudwatch.html

[ ]:
from cloudwatch import get_endpoint_metrics
[ ]:
cw_client = boto3.client("cloudwatch", region)
df_cw = get_endpoint_metrics(sm_client, cw_client, region, default_job_name, include_plots=True)

Below is an example of the CloudWatch metrics and graphs returned for a single endpoint benchmarked by Inference Recommender.

458240a4ca884641a83500109e780f81

11b. View Inference Recommender CloudWatch metrics

You can also observe load test metrics as seen by Inference Recommender. Inference Recommender publishes the following metrics to the /aws/sagemaker/InferenceRecommendationsJobs CloudWatch name space.

Metric

Description

ClientInvocations

The number of InvokeEndpoint requests sent to a model endpoint as seen by Inference Recommender. To get the total number of requests sent to a model endpoint, use the Sum statistic. Units: None Valid statistics: Sum

ClientInvocationErrors

The number of InvokeEndpoint requests that failed as seen by Inference Recommender. Units: None Valid statistics: Sum

ClientLatency

The interval of time taken by a model to respond as seen by Inference Recommender. Note that the time is in milliseconds, where as ModelLatency endpoint invocation metric is in microseconds. Units: Milliseconds Valid statistics: Average, Sum, Min, Max, Sample Count, percentiles

NumberOfUsers

The number of users sending InvokeEndpoint requests sent to a model endpoint. Units: None Valid statistics: Max, Min, Average

We will graph the “ClientInvocations” metric of the first recommendation in the job result as an example.

[ ]:
from PIL import Image
from io import BytesIO
import json

cloudwatch_client = boto3.client("cloudwatch")

endpoint_name = inference_recommender_job["InferenceRecommendations"][0]["EndpointConfiguration"][
    "EndpointName"
]
widget = {
    "metrics": [
        [
            "/aws/sagemaker/InferenceRecommendationsJobs",
            "ClientInvocations",
            "JobName",
            advanced_job_name,
            "EndpointName",
            endpoint_name,
        ]
    ],
    "title": "ClientInvocations",
    "view": "timeSeries",
    "start": "-PT1H",
    "end": "P0D",
    "stat": "Sum",
    "period": 60,
}

widget_string = json.dumps(widget)

response = cloudwatch_client.get_metric_widget_image(MetricWidget=widget_string)

Image.open(BytesIO(response["MetricWidgetImage"]))

9. Conclusion

This notebook discussed how to use SageMaker Inference Recommender with a TensorFlow model to help determine the right CPU instance type, instance count and environment variables to reduce costs and maximize performance. The notebook walked you through downloading a pre-trained TensorFlow model, registering your model in Model Registry, creating an Inference Recommender Default job, and creating an Inference Recommender Advanced job to get recommendations. In addition, you can use the supplied helper functions to review CloudWatch metrics right from your notebook. You can modify the environment variables and instance types to match your own ML workload as well as bring your own TensorFlow model for testing.

Notebook CI Test Results

This notebook was tested in multiple regions. The test results are as follows, except for us-west-2 which is shown at the top of the notebook.

This us-east-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This us-east-2 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This us-west-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This ca-central-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This sa-east-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This eu-west-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This eu-west-2 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This eu-west-3 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This eu-central-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This eu-north-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This ap-southeast-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This ap-southeast-2 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This ap-northeast-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This ap-northeast-2 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This ap-south-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable