End-to-End Multiclass Image Classification Example

  1. Introduction

  2. Prerequisites and Preprocessing

  3. Permissions and environment variables

  4. Training the ResNet model

  5. Deploy The Model

  6. Create model

  7. Batch transform

  8. Realtime inference

    1. Create endpoint configuration

    2. Create endpoint

    3. Perform inference

    4. Clean up


Welcome to our end-to-end example of distributed image classification algorithm. In this demo, we will use the Amazon sagemaker image classification algorithm to train on the Caltech-256 dataset.

To get started, we need to set up the environment with a few prerequisite steps, for permissions, configurations, and so on.

Prequisites and Preprocessing

Permissions and environment variables

Here we set up the linkage and authentication to AWS services. There are three parts to this:

  • The roles used to give learning and hosting access to your data. This will automatically be obtained from the role used to start the notebook

  • The S3 bucket that you want to use for training and model data

  • The Amazon sagemaker image classification docker image which need not be changed

[ ]:
import boto3
import re
import sagemaker
from sagemaker import get_execution_role
from sagemaker import image_uris

role = get_execution_role()

bucket = sagemaker.Session().default_bucket()

training_image = image_uris.retrieve(
    region=boto3.Session().region_name, framework="image-classification"

Data preparation

Download the data and transfer to S3 for use in training.

[ ]:
import boto3

s3_client = boto3.client("s3")

def upload_to_s3(channel, file):
    s3 = boto3.resource("s3")
    data = open(file, "rb")
    key = channel + "/" + file
    s3.Bucket(bucket).put_object(Key=key, Body=data)

# caltech-256
s3_train_key = "image-classification-full-training/train"
s3_validation_key = "image-classification-full-training/validation"
s3_train = "s3://{}/{}/".format(bucket, s3_train_key)
s3_validation = "s3://{}/{}/".format(bucket, s3_validation_key)

upload_to_s3(s3_train_key, "caltech-256-60-train.rec")
upload_to_s3(s3_validation_key, "caltech-256-60-val.rec")

Training the ResNet model

In this demo, we are using Caltech-256 dataset, which contains 30608 images of 256 objects. For the training and validation data, we follow the splitting scheme in this MXNet example. In particular, it randomly selects 60 images per class for training, and uses the remaining data for validation. The algorithm takes RecordIO file as input. The user can also provide the image files as input, which will be converted into RecordIO format using MXNet’s im2rec tool. It takes under a minute to convert the entire Caltech-256 dataset (~1.2GB) on a p3.2xlarge instance. However, for this demo, we will use record io format.

Once we have the data available in the correct format for training, the next step is to actually train the model using the data. After setting training parameters, we kick off training, and poll for status until training is completed.

Training parameters

There are two kinds of parameters that need to be set for training. The first one are the parameters for the training job. These include:

  • Input specification: These are the training and validation channels that specify the path where training data is present. These are specified in the “InputDataConfig” section. The main parameters that need to be set is the “ContentType” which can be set to “rec” or “lst” based on the input data format and the S3Uri which specifies the bucket and the folder where the data is present.

  • Output specification: This is specified in the “OutputDataConfig” section. We just need to specify the path where the output can be stored after training

  • Resource config: This section specifies the type of instance on which to run the training and the number of hosts used for training. If “InstanceCount” is more than 1, then training can be run in a distributed manner.

Apart from the above set of parameters, there are hyperparameters that are specific to the algorithm. These are:

  • num_layers: The number of layers (depth) for the network. We use 101 in this samples but other values such as 50, 152 can be used.

  • num_training_samples: This is the total number of training samples. It is set to 15420 for caltech dataset with the current split

  • num_classes: This is the number of output classes for the new dataset. Imagenet was trained with 1000 output classes but the number of output classes can be changed for fine-tuning. For caltech, we use 257 because it has 256 object categories + 1 clutter class

  • epochs: Number of training epochs

  • learning_rate: Learning rate for training

  • mini_batch_size: The number of training samples used for each mini batch. In distributed training, the number of training samples used per batch will be N * mini_batch_size where N is the number of hosts on which training is run

After setting training parameters, we kick off training, and poll for status until training is completed, which in this example, takes between 3 to 5 minutes per epoch on a p3.2xlarge machine. The network typically converges after 10 epochs. However, to save the training time, we set the epochs to 2 but please keep in mind that it may not be sufficient to generate a good model.

[ ]:
# The algorithm supports multiple network depth (number of layers). They are 18, 34, 50, 101, 152 and 200
# For this training, we will use 18 layers
num_layers = "18"
# we need to specify the input image shape for the training data
image_shape = "3,224,224"
# we also need to specify the number of training samples in the training set
# for caltech it is 15420
num_training_samples = "15420"
# specify the number of output classes
num_classes = "257"
# batch size for training
mini_batch_size = "64"
# number of epochs
epochs = "2"
# learning rate
learning_rate = "0.01"


Run the training using Amazon sagemaker CreateTrainingJob API

[ ]:
import time
import boto3
from time import gmtime, strftime

s3 = boto3.client("s3")
# create unique job name
job_name_prefix = "DEMO-imageclassification"
job_name = job_name_prefix + "-" + time.strftime("-%Y-%m-%d-%H-%M-%S", time.gmtime())
training_params = {
    # specify the training image
    "AlgorithmSpecification": {"TrainingImage": training_image, "TrainingInputMode": "File"},
    "RoleArn": role,
    "OutputDataConfig": {"S3OutputPath": "s3://{}/{}/output".format(bucket, job_name_prefix)},
    "ResourceConfig": {"InstanceCount": 1, "InstanceType": "ml.p3.2xlarge", "VolumeSizeInGB": 50},
    "TrainingJobName": job_name,
    "HyperParameters": {
        "image_shape": image_shape,
        "num_layers": str(num_layers),
        "num_training_samples": str(num_training_samples),
        "num_classes": str(num_classes),
        "mini_batch_size": str(mini_batch_size),
        "epochs": str(epochs),
        "learning_rate": str(learning_rate),
    "StoppingCondition": {"MaxRuntimeInSeconds": 360000},
    # Training data should be inside a subdirectory called "train"
    # Validation data should be inside a subdirectory called "validation"
    # The algorithm currently only supports fullyreplicated model (where data is copied onto each machine)
    "InputDataConfig": [
            "ChannelName": "train",
            "DataSource": {
                "S3DataSource": {
                    "S3DataType": "S3Prefix",
                    "S3Uri": s3_train,
                    "S3DataDistributionType": "FullyReplicated",
            "ContentType": "application/x-recordio",
            "CompressionType": "None",
            "ChannelName": "validation",
            "DataSource": {
                "S3DataSource": {
                    "S3DataType": "S3Prefix",
                    "S3Uri": s3_validation,
                    "S3DataDistributionType": "FullyReplicated",
            "ContentType": "application/x-recordio",
            "CompressionType": "None",
print("Training job name: {}".format(job_name))
    "\nInput Data Location: {}".format(
[ ]:
# create the Amazon SageMaker training job
sagemaker = boto3.client(service_name="sagemaker")

# confirm that the training job has started
status = sagemaker.describe_training_job(TrainingJobName=job_name)["TrainingJobStatus"]
print("Training job current status: {}".format(status))

    # wait for the job to finish and report the ending status
    training_info = sagemaker.describe_training_job(TrainingJobName=job_name)
    status = training_info["TrainingJobStatus"]
    print("Training job ended with status: " + status)
    print("Training failed to start")
    # if exception is raised, that means it has failed
    message = sagemaker.describe_training_job(TrainingJobName=job_name)["FailureReason"]
    print("Training failed with the following error: {}".format(message))
[ ]:
training_info = sagemaker.describe_training_job(TrainingJobName=job_name)
status = training_info["TrainingJobStatus"]
print("Training job ended with status: " + status)

If you see the message,

Training job ended with status: Completed

then that means training successfully completed and the output model was stored in the output path specified by training_params['OutputDataConfig'].

You can also view information about and the status of a training job using the AWS SageMaker console. Just click on the “Jobs” tab.

Deploy The Model

A trained model does nothing on its own. We now want to use the model to perform inference. For this example, that means predicting the topic mixture representing a given document.

This section involves several steps,

  1. Create Model - Create model for the training output

  2. Batch Transform - Create a transform job to perform batch inference.

  3. Host the model for realtime inference - Create an inference endpoint and perform realtime inference.

Create Model

We now create a SageMaker Model from the training output. Using the model we can create a Batch Transform Job or an Endpoint Configuration.

[ ]:
import boto3
from time import gmtime, strftime

sage = boto3.Session().client(service_name="sagemaker")

model_name = "DEMO-full-image-classification-model" + time.strftime(
    "-%Y-%m-%d-%H-%M-%S", time.gmtime()
info = sage.describe_training_job(TrainingJobName=job_name)
model_data = info["ModelArtifacts"]["S3ModelArtifacts"]

hosting_image = image_uris.retrieve(
    region=boto3.Session().region_name, framework="image-classification"

primary_container = {
    "Image": hosting_image,
    "ModelDataUrl": model_data,

create_model_response = sage.create_model(
    ModelName=model_name, ExecutionRoleArn=role, PrimaryContainer=primary_container


Batch transform

We now create a SageMaker Batch Transform job using the model created above to perform batch prediction.

Download test data

[ ]:
# Download images under /008.bathtub
!aws s3 sync s3://sagemaker-sample-files/datasets/image/caltech-256/256_ObjectCategories/008.bathtub/ /tmp/images/008.bathtub/
[ ]:
batch_input = "s3://{}/image-classification-full-training/test/".format(bucket)
test_images = "/tmp/images/008.bathtub"

!aws s3 cp $test_images $batch_input --recursive --quiet
[ ]:
timestamp = time.strftime("-%Y-%m-%d-%H-%M-%S", time.gmtime())
batch_job_name = "image-classification-model" + timestamp
request = {
    "TransformJobName": batch_job_name,
    "ModelName": model_name,
    "MaxConcurrentTransforms": 16,
    "MaxPayloadInMB": 6,
    "BatchStrategy": "SingleRecord",
    "TransformOutput": {"S3OutputPath": "s3://{}/{}/output".format(bucket, batch_job_name)},
    "TransformInput": {
        "DataSource": {"S3DataSource": {"S3DataType": "S3Prefix", "S3Uri": batch_input}},
        "ContentType": "application/x-image",
        "SplitType": "None",
        "CompressionType": "None",
    "TransformResources": {"InstanceType": "ml.c5.xlarge", "InstanceCount": 1},

print("Transform job name: {}".format(batch_job_name))
print("\nInput Data Location: {}".format(s3_validation))
[ ]:
sagemaker = boto3.client("sagemaker")

print("Created Transform job with name: ", batch_job_name)

while True:
    response = sagemaker.describe_transform_job(TransformJobName=batch_job_name)
    status = response["TransformJobStatus"]
    if status == "Completed":
        print("Transform job ended with status: " + status)
    if status == "Failed":
        message = response["FailureReason"]
        print("Transform failed with the following error: {}".format(message))
        raise Exception("Transform job failed")

After the job completes, let’s inspect the prediction results. The accuracy may not be quite good because we set the epochs to 2 during training which may not be sufficient to train a good model.

[ ]:
from urllib.parse import urlparse
import json
import numpy as np

object_categories = [

def list_objects(s3_client, bucket, prefix):
    response = s3_client.list_objects(Bucket=bucket, Prefix=prefix)
    objects = [content["Key"] for content in response["Contents"]]
    return objects

def get_label(s3_client, bucket, prefix):
    filename = prefix.split("/")[-1]
    s3_client.download_file(bucket, prefix, filename)
    with open(filename) as f:
        data = json.load(f)
        index = np.argmax(data["prediction"])
        probability = data["prediction"][index]
    print("Result: label - " + object_categories[index] + ", probability - " + str(probability))
    return object_categories[index], probability

inputs = list_objects(s3_client, bucket, urlparse(batch_input).path.lstrip("/"))
print("Sample inputs: " + str(inputs[:2]))

outputs = list_objects(s3_client, bucket, batch_job_name + "/output")
print("Sample output: " + str(outputs[:2]))

# Check prediction result of the first 2 images
[get_label(s3_client, bucket, prefix) for prefix in outputs[0:10]]

Realtime inference

We now host the model with an endpoint and perform realtime inference.

This section involves several steps, 1. Create endpoint configuration - Create a configuration defining an endpoint. 1. Create endpoint - Use the configuration to create an inference endpoint. 1. Perform inference - Perform inference on some input data using the endpoint. 1. Clean up - Delete the endpoint and model

Create Endpoint Configuration

At launch, we will support configuring REST endpoints in hosting with multiple models, e.g. for A/B testing purposes. In order to support this, customers create an endpoint configuration, that describes the distribution of traffic across the models, whether split, shadowed, or sampled in some way.

In addition, the endpoint configuration describes the instance type required for model deployment, and at launch will describe the autoscaling configuration.

[ ]:
from time import gmtime, strftime

timestamp = time.strftime("-%Y-%m-%d-%H-%M-%S", time.gmtime())
endpoint_config_name = job_name_prefix + "-epc-" + timestamp
endpoint_config_response = sage.create_endpoint_config(
            "InstanceType": "ml.m4.xlarge",
            "InitialInstanceCount": 1,
            "ModelName": model_name,
            "VariantName": "AllTraffic",

print("Endpoint configuration name: {}".format(endpoint_config_name))
print("Endpoint configuration arn:  {}".format(endpoint_config_response["EndpointConfigArn"]))

Create Endpoint

Next, the customer creates the endpoint that serves up the model, through specifying the name and configuration defined above. The end result is an endpoint that can be validated and incorporated into production applications.

[ ]:
import time

timestamp = time.strftime("-%Y-%m-%d-%H-%M-%S", time.gmtime())
endpoint_name = job_name_prefix + "-ep-" + timestamp
print("Endpoint name: {}".format(endpoint_name))

endpoint_params = {
    "EndpointName": endpoint_name,
    "EndpointConfigName": endpoint_config_name,
endpoint_response = sagemaker.create_endpoint(**endpoint_params)
print("EndpointArn = {}".format(endpoint_response["EndpointArn"]))

Now the endpoint can be created. It may take a few minutes to create the endpoint…

[ ]:
# get the status of the endpoint
response = sagemaker.describe_endpoint(EndpointName=endpoint_name)
status = response["EndpointStatus"]
print("EndpointStatus = {}".format(status))

# wait until the status has changed

# print the status of the endpoint
endpoint_response = sagemaker.describe_endpoint(EndpointName=endpoint_name)
status = endpoint_response["EndpointStatus"]
print("Endpoint creation ended with EndpointStatus = {}".format(status))

if status != "InService":
    raise Exception("Endpoint creation failed.")

If you see the message,

Endpoint creation ended with EndpointStatus = InService

then congratulations! You now have a functioning inference endpoint. You can confirm the endpoint configuration and status by navigating to the “Endpoints” tab in the AWS SageMaker console.

We will finally create a runtime object from which we can invoke the endpoint.

Perform Inference

Finally, the customer can now validate the model for use. They can obtain the endpoint from the client library using the result from previous operations, and generate classifications from the trained model using that endpoint.

[ ]:
import boto3

runtime = boto3.Session().client(service_name="runtime.sagemaker")
Download test image
[ ]:
file_name = "/tmp/test.jpg"

# test image
from IPython.display import Image


Evaluate the image through the network for inteference. The network outputs class probabilities and typically, one selects the class with the maximum probability as the final class output.

Note: The output class detected by the network may not be accurate in this example. To limit the time taken and cost of training, we have trained the model only for a couple of epochs. If the network is trained for more epochs (say 20), then the output class will be more accurate.

[ ]:
import json
import numpy as np

with open(file_name, "rb") as f:
    payload = f.read()
    payload = bytearray(payload)
response = runtime.invoke_endpoint(
    EndpointName=endpoint_name, ContentType="application/x-image", Body=payload
result = response["Body"].read()
# result will be in json format and convert it to ndarray
result = json.loads(result)
# the result will output the probabilities for all classes
# find the class with maximum probability and print the class index
index = np.argmax(result)
print("Result: label - " + object_categories[index] + ", probability - " + str(result[index]))

Clean up

When we’re done with the endpoint, we can just delete it and the backing instances will be released. Run the following cell to delete the endpoint.

[ ]:
[ ]: