JumpStart - Image Classification Benchmarking and Model Selection


This notebook’s CI test result for us-west-2 is as follows. CI test results in other regions can be found at the end of the notebook.

This us-west-2 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable



Welcome to Amazon SageMaker JumpStart! You can use JumpStart to solve many Machine Learning tasks through one-click in SageMaker Studio, or through SageMaker JumpStart API. ***

This demo notebook demonstrates how to use the SageMaker JumpStart API to perform large-scale image classification model selection or benchmarking tasks. The SageMaker JumpStart model hub provides access to many image classification models that enable transfer learning and fine-tuning on custom datasets. Many types of models have been developed to accomplish the image classification task, and the SageMaker JumpStart model hub now contains many of these popular model architectures, to include Residual Networks (ResNet), MobileNet, EfficientNet, Inception, Neural Architecture Search Networks (NASNet), Big Transfer (BiT), Shifted Window (Swin) Transformers, Class-Attention in Image Transformers (CaiT), and Data-Efficient Image Transformers (DeiT). Vastly different internal structures comprise each model architectures. For instance, ResNet models utilize skip connections to allow for substantially deeper networks while transformer-based models use self-attention mechanisms that eliminate the intrinsic locality of convolution operations in favor of more global receptive fields. In addition to the diverse feature sets that each of these different structures provide, each model architecture has several model configurations that adjust the model size, shape, and complexity within that architecture. This results in hundreds of unique image classification models available on the SageMaker JumpStart model hub.

A machine learning practitioner may therefore ask questions like: “what model should I deploy to achieve the best performance on my dataset?” And a machine learning researcher may ask questions like: “how can I produce my own fair comparison of multiple model architectures against a specified dataset while controlling training hyperparameters and machine architectures?” The former question addresses model selection across model architectures while the latter question concerns benchmarking trained models against a test dataset.

This notebook demonstrates a methodology to perform either model selection or benchmarking tasks with the SageMaker JumpStart API by asynchronously launching SageMaker Automatic Model Tuning jobs.


  1. Set Up

  2. Identify models and datasets

  3. Create helper functions for training

  4. Asynchronously execute hyperparameter tuning jobs

  5. Analyze results ***

1. Set Up


First, perform necessary imports. ***

[ ]:
import concurrent.futures as cf
import json
import itertools
import queue
import time
import traceback
from pathlib import Path
from pprint import pprint
from typing import Any, Callable, Dict, List, NamedTuple, Optional, Tuple

import boto3
import pandas as pd
import sagemaker
import sagemaker.hyperparameters
import sagemaker.model_uris
import sagemaker.script_uris
import sagemaker.image_uris
from botocore.config import Config
from sagemaker.tuner import CategoricalParameter

Now you can identify the top-level constants and adjustable parameters that will be utilized throughout this notebook. These values are gathered at the top of this notebook, so commonly needed adjustments can be made in one place.

The first set of constants control SageMaker training job behaviors: - SM_TRAINING_INSTANCE_TYPE: EC2 instance type used for training. The default here is set to ml.p3.2xlarge. If your SageMaker service quota does not support this many simultaneous training jobs, please do one of the following: 1) request an increase to your SageMaker service quota, 2) adjust the constants SM_AMT_MAX_PARALLEL_TRAINING_JOBS_PER_TUNER and SM_AMT_MAX_PARALLEL_TUNING_JOBS to comply with your SageMaker service quota, or 3) change this instance type to an instance with a larger default SageMaker service quota, such as ml.m5.4xlarge. - SM_AMT_MAX_JOBS: Maximum total number of training jobs to start per hyperparameter tuning job. - SM_AMT_MAX_PARALLEL_TRAINING_JOBS_PER_TUNER: Maximum number of parallel training jobs per hyperparameter tuning job. - SM_AMT_MAX_PARALLEL_TUNING_JOBS: Maximum number of parallel hyperparameter tuning jobs. The resulting maximum number of parallel training jobs is SM_AMT_MAX_PARALLEL_TRAINING_JOBS_PER_TUNER * SM_AMT_MAX_PARALLEL_TUNING_JOBS. Please ensure this quantity is less than your SageMaker Training service quota for the specified instance type in your current region. - SM_AMT_OBJECTIVE_METRIC_NAME: Name of the metric for evaluating training jobs during model selection. This value is used for early stopping criteria and hyperparameter tuning model selection. - SM_AMT_OBJECTIVE_TYPE: The type of the objective metric for evaluating training jobs. This value can be either ‘Minimize’ or ‘Maximize’. - SM_SESSION: SageMaker Session object with custom configuration to resolve SDK rate exceeded and throttling exceptions. - SM_AMT_HYPERPARAMETER_RANGE_LAMBDA_DICT: Dictionary of callables that accept a hyperparameter dictionary and return a sagemaker.parameter.ParameterRange. These parameter ranges can be one of three types: Continuous, Integer, or Categorical. The keys of the dictionary are the names of the hyperparameter, and the values are callables that produce the appropriate parameter range class to represent the range given a single hyperparameter dictionary argument. This allows AMT hyperparameter range defitions to depend on default model hyperparameter values. - SM_TRAINING_METRIC_DEFINITIONS: A list of dictionaries that defines the metric(s) used to evaluate the training jobs. Each dictionary contains two keys: ‘Name’ for the name of the metric, and ‘Regex’ for the regular expression used to extract the metric from the logs. SageMaker JumpStart provides many pre-implemented extractable metrics.

The next set of constants control the behavior of the training script: - HYPERPARAMETERS: Set of hyperparameters overriding any default built-in value.

Finally, this notebook provides features to re-attach previously launched training jobs and load previously saved metrics for further analysis. The following constants control this behavior: - SAVE_TUNING_JOB_NAMES_FILE_PATH: Path of the JSON Lines file that keeps track of the tuning job name associated with a unique model name and dataset name. - SAVE_METRICS_FILE_PATH: Path of the JSON Lines file that records metrics associated with each tuning job. ***

[ ]:
SM_TRAINING_INSTANCE_TYPE = "ml.p3.2xlarge"
SM_AMT_MAX_JOBS = 2
SM_AMT_MAX_PARALLEL_TRAINING_JOBS_PER_TUNER = 2
SM_AMT_MAX_PARALLEL_TUNING_JOBS = 8
SM_AMT_OBJECTIVE_METRIC_NAME = "val_accuracy"
SM_AMT_OBJECTIVE_TYPE = "Maximize"
SM_SESSION = sagemaker.Session(
    sagemaker_client=boto3.client(
        "sagemaker",
        config=Config(connect_timeout=5, read_timeout=60, retries={"max_attempts": 20}),
    )
)
SM_AMT_HYPERPARAMETER_RANGE_LAMBDA_DICT = {
    "learning_rate": lambda x: CategoricalParameter(
        [float(x["learning_rate"]), float(x["learning_rate"]) / 5]
    )
}
_METRICS_MULTICLASS = ("top_5_accuracy",)
_METRICS_BINARY = ("precision", "recall", "auc", "prc")
_METRICS = ("accuracy", "loss", *_METRICS_MULTICLASS, *_METRICS_BINARY)
_NAME, _RE = "Name", "Regex"
SM_TRAINING_METRIC_DEFINITIONS = [
    *({_NAME: f"train_{metric}", _RE: f"- {metric}: ([0-9\\.]+)"} for metric in _METRICS),
    *({_NAME: f"val_{metric}", _RE: f"- val_{metric}: ([0-9\\.]+)"} for metric in _METRICS),
    *({_NAME: f"test_{metric}", _RE: f"- Test {metric}: ([0-9\\.]+)"} for metric in _METRICS),
    {_NAME: "num_params", _RE: "- Number of parameters: ([0-9\\.]+)"},
    {_NAME: "num_trainable_params", _RE: "- Number of trainable parameters: ([0-9\\.]+)"},
    {_NAME: "num_non_trainable_params", _RE: "- Number of non-trainable parameters: ([0-9\\.]+)"},
    {_NAME: "train_duration", _RE: "- Total training duration: ([0-9\\.]+)"},
    {_NAME: "train_epoch_duration", _RE: "- Average training duration per epoch: ([0-9\\.]+)"},
    {_NAME: "test_evaluation_latency", _RE: "- Test evaluation latency: ([0-9\\.]+)"},
    {_NAME: "test_latency_per_sample", _RE: "- Average test latency per sample: ([0-9\\.]+)"},
    {_NAME: "test_throughput", _RE: "- Average test throughput: ([0-9\\.]+)"},
]

HYPERPARAMETERS = {
    "epochs": 5,
    "early_stopping": "True",
    "early_stopping_patience": 3,
    "early_stopping_min_delta": 0.0,
}

SAVE_TUNING_JOB_NAMES_FILE_PATH = Path.cwd() / "benchmarking_tuning_job_names.jsonl"
SAVE_METRICS_FILE_PATH = Path.cwd() / "benchmarking_metrics.jsonl"

2. Identify models and datasets


In this section, you will define two tuples, MODELS and DATASETS, which contain unique identifiers for all models and all datasets you wish to perform this benchmarking task on. The hyperparameter tuning jobs instantiated will be the Cartesian product between these two lists.


First, you will identify all built-in image classification model IDs to run this benchmarking task on. Because SageMaker JumpStart maintains many models for this task, the default code in this notebook identifies only a few models by model ID. You can run a thorough benchmarking or model selection analysis on all TensorFlow image classification models made available by SageMaker Built-In Algorithms via:

from sagemaker.jumpstart.notebook_utils import list_jumpstart_models
from sagemaker.jumpstart.filters import And

filter_value = And("task == ic", "framework == tensorflow")
MODELS = list_jumpstart_models(filter=filter_value)

This may be desired if you have a unique dataset and would like to perform large-scale benchmarking or model selection tasks on your custom dataset. However, please be cautious as a benchmarking task with this many models will require the deployment of many resources. ***

[ ]:
MODELS = (
    "tensorflow-ic-swin-base-patch4-window7-224",
    "tensorflow-ic-efficientnet-v2-imagenet21k-ft1k-m",
    "tensorflow-ic-imagenet-mobilenet-v3-large-100-224",
    "tensorflow-ic-imagenet-inception-v3-classification-4",
    "tensorflow-ic-resnet-50-classification-1",
    "tensorflow-ic-imagenet-nasnet-mobile",
)

You also need to identify the datasets to perform benchmarking on. Unlike built-in models, an API to query available datasets on S3 does not exist. It is also likely that you may have your own dataset hosted on S3 that you would like to benchmark. The following data structures provide a consistent framework to define dataset locations for the scope of this notebook. This is important because the benchmarking task is most beneficial with a training/validation/test dataset split. While possible, it is not recommended to let the model transfer learning script perform this split. Fitting a SageMaker Estimator requires channel definitions, which these objects create automatically via the S3DatasetSplit.channels method.

Notes on dataset channel behaviors: Training will utilize only the data provided in the “training” channel, model selection across hyperparameters and epochs will use data in the “validation” channel, and the final evaluation of model performance will be based on data provided in the “test” channel. If a “test” channel is not provided, then training should complete successfully, but metric definitions with a name matching the pattern “test_*” will not be available in the training job logs. If a “validation” channel is not provided, then the default behavior of the SageMaker JumpStart TensorFlow image classification algorithm is to perform a split of the “training” channel dataset into training and validation datasets. ***

[ ]:
class S3Dataset:
    def __init__(self, bucket: str, prefix: str) -> None:
        self.bucket = bucket
        self.prefix = prefix

    def path(self) -> str:
        return f"s3://{self.bucket}/{self.prefix}"


class S3DatasetSplit:
    def __init__(
        self,
        name: str,
        train: S3Dataset,
        validation: Optional[S3Dataset] = None,
        test: Optional[S3Dataset] = None,
        hyperparameters: Optional[Dict[str, str]] = None,
    ) -> None:
        self.name = name
        self.train = train
        self.validation = validation
        self.test = test
        self.hyperparameters = hyperparameters

    @classmethod
    def from_prefixes(
        cls,
        name: str,
        bucket: str,
        prefix_train: str,
        prefix_validation: Optional[str] = None,
        prefix_test: Optional[str] = None,
        hyperparameters: Optional[Dict[str, str]] = None,
    ) -> "S3DatasetSplit":
        train = S3Dataset(bucket, prefix_train)
        validation = S3Dataset(bucket, prefix_validation) if prefix_validation is not None else None
        test = S3Dataset(bucket, prefix_test) if prefix_test is not None else None
        return cls(name, train, validation, test, hyperparameters)

    def channels(self) -> Dict[str, str]:
        res = {"training": self.train.path()}
        if self.validation is not None:
            res["validation"] = self.validation.path()
        if self.test is not None:
            res["test"] = self.test.path()
        return res

Next, a dictionary of available datasets is created and one of these datasets is selected to perform analysis. To get a feel for the performance of different models with respect to different datasets, simply run this notebook for a different selected list of datasets! If you have your own dataset, just create a new entry that specifies the bucket along with prefixes for the train, validation, and test datasets. The dataset should be structured according to the built-in algorithm training data input format.

Two example datasets are populated. These are both small datasets and used here for demonstration purposes. One of these datasets, ants-and-bees, only has two class labels, ants and bees, thereby allowing you to train the models with a binary_mode hyperparameter set to True. With this setting, the SageMaker JumpStart built-in algorithm will create a model that returns a single probability number for the positive class and can use additional eval_metric options. ***

[ ]:
DATASET_DICT = {
    "tf-flowers": S3DatasetSplit.from_prefixes(
        name="tf-flowers",
        bucket=f"jumpstart-cache-prod-{SM_SESSION.boto_region_name}",
        prefix_train="training-datasets/cv/tf-flowers-split/train/",
        prefix_validation="training-datasets/cv/tf-flowers-split/val/",
        prefix_test="training-datasets/cv/tf-flowers-split/test/",
    ),
    "ants-and-bees": S3DatasetSplit.from_prefixes(
        name="ants-and-bees",
        bucket=f"jumpstart-cache-prod-{SM_SESSION.boto_region_name}",
        prefix_train="training-datasets/cv/ants-and-bees-split/train/",
        prefix_validation="training-datasets/cv/ants-and-bees-split/val/",
        prefix_test="training-datasets/cv/ants-and-bees-split/test/",
        hyperparameters={"binary_mode": "True"},
    ),
}

# This tuple specifies which datasets will be used
DATASETS = (
    "tf-flowers",
    "ants-and-bees",
)

3. Create helper functions for training


This section contains a variety of helper functions that will be utilized for this SageMaker TensorFlow image classification benchmarking task, including functions to: 1) create a SageMaker Estimator object from the JumpStart model hub 2) create a SageMaker HyperparameterTuner for a specified model from the JumpStart model hub 3) re-attach a SageMaker HyperparameterTuner if a tuning job has already started 4) extract metrics from Estimator logs 5) save tuning job information to file to enable re-attaching jobs in new sessions 6) save resulting benchmarking metrics to file


The following block contains a helper function to obtain a SageMaker Estimator for a given JumpStart built-in model_id. This includes obtaining the appropriate URIs for the training docker image, the training script tarball, and the pre-trained model tarball to further fine-tune. This retrieval is provided by the SageMaker JumpStart built-in algorithms and allows for the creation of a SageMaker Estimator instance directly from these URIs. ***

[ ]:
def create_jumpstart_estimator(
    model_id: str,
    role: str,
    job_name: str,
    s3_output_location: str,
    model_version: str,
    instance_type: str = SM_TRAINING_INSTANCE_TYPE,
    metric_definitions: Optional[List[Dict[str, str]]] = None,
    hyperparameters: Optional[Dict[str, str]] = None,
) -> sagemaker.estimator.Estimator:
    """Obtain a SageMaker Estimator for a given model ID."""

    # Retrieve the docker image
    train_image_uri = sagemaker.image_uris.retrieve(
        region=None,
        framework=None,
        model_id=model_id,
        model_version=model_version,
        image_scope="training",
        instance_type=instance_type,
    )

    # Retrieve the training script
    train_source_uri = sagemaker.script_uris.retrieve(
        model_id=model_id, model_version=model_version, script_scope="training"
    )

    # Retrieve the pre-trained model tarball to further fine-tune
    train_model_uri = sagemaker.model_uris.retrieve(
        model_id=model_id, model_version=model_version, model_scope="training"
    )

    # Create and return SageMaker Estimator instance
    return sagemaker.estimator.Estimator(
        role=role,
        image_uri=train_image_uri,
        source_dir=train_source_uri,
        model_uri=train_model_uri,
        entry_point="transfer_learning.py",
        instance_count=1,
        instance_type=instance_type,
        max_run=360000,
        hyperparameters=hyperparameters,
        output_path=s3_output_location,
        base_job_name=job_name,
        metric_definitions=metric_definitions,
        sagemaker_session=SM_SESSION,
    )

While you now have a means to create a SageMaker Estimator, default hyperparameter values may not be sufficient for the considered task. Therefore, to obtain the best benchmarking results, you provide this Estimator to a SageMaker hyperparameter tuning job. The following function uses create_jumpstart_estimator to obtain a SageMaker HyperparameterTuner for a given JumpStart build-in model_id with properties for this benchmarking task. Because tuning jobs have a 32-character name length limit and this benchmarking task can create many tuning jobs with similar (or identical) names after truncation, a unique-id is provided for each model to enforce unique tuning job names. ***

[ ]:
def create_benchmarking_tuner(
    model_id: str,
    unique_id: int,
    session: sagemaker.session.Session = SM_SESSION,
    model_version: str = "*",
    hyperparameters: Optional[Dict[str, str]] = HYPERPARAMETERS,
    dataset_hyperparameters: Optional[Dict[str, str]] = None,
    hyperparameter_range_lambda_dict: Dict[str, Callable] = SM_AMT_HYPERPARAMETER_RANGE_LAMBDA_DICT,
    metric_definitions: List[Dict[str, str]] = SM_TRAINING_METRIC_DEFINITIONS,
) -> sagemaker.tuner.HyperparameterTuner:
    """Obtain a SageMaker HyperparameterTuner with properties for this benchmarking task.

    A unique ID is helpful to distinguish names of benchmarking jobs.
    """
    role = session.get_caller_identity_arn()
    output_bucket = session.default_bucket()
    output_prefix = "jumpstart-example-tf-ic-benchmarking"
    job_name = sagemaker.utils.name_from_base(
        f"bm-{unique_id}-{model_id.replace('tensorflow-ic-', '')}"
    )
    s3_output_location = f"s3://{output_bucket}/{output_prefix}/output"

    _hyperparameters = sagemaker.hyperparameters.retrieve_default(
        model_id=model_id, model_version=model_version
    )
    if dataset_hyperparameters is not None:
        _hyperparameters.update(dataset_hyperparameters)
    if hyperparameters is not None:
        _hyperparameters.update(hyperparameters)

    estimator = create_jumpstart_estimator(
        model_id,
        role,
        job_name,
        s3_output_location,
        model_version=model_version,
        metric_definitions=metric_definitions,
        hyperparameters=_hyperparameters,
    )

    hyperparameter_ranges = {
        name: func(_hyperparameters) for name, func in hyperparameter_range_lambda_dict.items()
    }

    tuner = sagemaker.tuner.HyperparameterTuner(
        estimator,
        SM_AMT_OBJECTIVE_METRIC_NAME,
        hyperparameter_ranges,
        metric_definitions,
        max_jobs=SM_AMT_MAX_JOBS,
        max_parallel_jobs=SM_AMT_MAX_PARALLEL_TRAINING_JOBS_PER_TUNER,
        objective_type=SM_AMT_OBJECTIVE_TYPE,
        base_tuning_job_name=job_name,
    )
    return tuner

With these helper functions established, you can now create a HyperparameterTuner object for each specified model. But what happens if there is an error or the kernel for this script is terminated? The hyperparameter tuning jobs would still run to completion, and you do would not want to re-launch these jobs in order to obtain our results. Therefore, you need yet another helper function that will either re-attach the hyperparameter tuning job if it exists or create a new one via create_benchmarking_tuner. To accomplish this, the JSON Lines file specified in SAVE_TUNING_JOB_NAMES_FILE_PATH is read and checked for whether a tuning job already exists for model_name and dataset_name. If it does exist, then the job is re-attached and returned. Otherwise, a new tuner is created and the fit() method is invoked with the wait=False argument and channels specified per the previously defined S3DatasetSplit object you used to store our dataset S3 location. You will have the thread wait for this job to complete later, but you first need to put this job information on the queue_save_tuning_job queue, which will indicate to the primary thread to append this job information to SAVE_TUNING_JOB_NAMES_FILE_PATH. Writing to this file needs to be done by the primary thread listener because multiple threads simultaneously writing to a file is not thread safe. ***

[ ]:
class JobInformation(NamedTuple):
    model_name: str
    dataset_name: str
    tuning_job_name: Optional[str] = None


def create_or_attach_tuner(
    model_name: str,
    dataset: S3DatasetSplit,
    unique_id: int,
    queue_save_tuning_job: queue.Queue,
    tuning_job_names_file_path: Path = SAVE_TUNING_JOB_NAMES_FILE_PATH,
    session: sagemaker.Session = SM_SESSION,
) -> Tuple[sagemaker.tuner.HyperparameterTuner, JobInformation]:
    if tuning_job_names_file_path.exists():
        tuning_jobs_df = pd.read_json(tuning_job_names_file_path, lines=True).set_index(
            ["model_name", "dataset_name"]
        )
        if (model_name, dataset.name) in tuning_jobs_df.index:
            tuning_job_name = tuning_jobs_df.loc[(model_name, dataset.name), "tuning_job_name"]
            tuner = sagemaker.tuner.HyperparameterTuner.attach(tuning_job_name, session)
            job_information = JobInformation(model_name, dataset.name, tuning_job_name)
            print(f"> Re-attached previous SageMaker tuning job, {job_information}")
            return tuner, job_information

    tuner = create_benchmarking_tuner(
        model_name, unique_id, dataset_hyperparameters=dataset.hyperparameters
    )
    tuner.fit(dataset.channels(), wait=False)
    tuning_job_name = tuner.latest_tuning_job.name
    job_information = JobInformation(model_name, dataset.name, tuning_job_name)
    queue_save_tuning_job.put(job_information)
    print(f"> Starting new SageMaker tuning job, {job_information}")
    return tuner, job_information

Once a tuning job is complete, you need to obtain a description of the best training job. While the objective metric for the hyperparameter tuner is easily obtained via the HyperparameterTuner.analytics() method, which returns a HyperparameterTuningJobAnalytics object, the additional auxiliary metrics provided to the original estimator are not extracted with this object. The following function will probe the training job description in order to extract all metrics of interest to this benchmarking scenario from the key FinalMetricDataList. It will also extract the hyperparameters utilized from the key HyperParameters, which may be useful in identifying specific information about the training job. You can look into the SageMaker DescribeTrainingJob API for any additional keys you would want to extract from Amazon CloudWatch. ***

[ ]:
def extract_job_summary_from_logs(
    tuner: sagemaker.tuner.HyperparameterTuner,
    job_information: JobInformation,
    session: sagemaker.Session = SM_SESSION,
) -> Dict[str, Any]:
    description = session.describe_training_job(tuner.best_training_job())
    metrics = {
        metric["MetricName"]: metric["Value"] for metric in description["FinalMetricDataList"]
    }
    hyperparameters = description["HyperParameters"]
    return {**metrics, **hyperparameters, **job_information._asdict()}

Next, you will define a function that runs a single tuning job. For a given model_id, this function will do three things: 1) obtain a HyperparameterTuner object for this model, 2) launch the hyperparameter tuning job and wait for the job to complete, and 3) extract the relevant metrics from Amazon CloudWatch Logs for this training job. Additionally, this function requests access to the queue_currently_running queue, which has a maximum capacity and will block without timeout until there is an available spot on the queue. This allows you to cap the number of simultaneously running hyperparameter tuning jobs. ***

[ ]:
def run_tuner(
    model_name: str,
    dataset: S3DatasetSplit,
    unique_id: int,
    queue_currently_running: queue.Queue,
    queue_save_tuning_job: queue.Queue,
) -> Dict[str, Any]:
    queue_currently_running.put(None)
    tuner, job_information = create_or_attach_tuner(
        model_name, dataset, unique_id, queue_save_tuning_job
    )
    tuner.wait()
    job_summary_dict = extract_job_summary_from_logs(tuner, job_information)
    print(f"> Completed SageMaker tuning job, {job_information}")
    return job_summary_dict

Finally, you need a couple of helper functions to log information to file. The first is intended to be triggered whenever the create_or_attach_tuner function puts job information onto the queue_save_tuning_job queue. Because you are using multithreading in this example, and it is not thread safe to have multiple threads write to file simultaneously, you will have the primary script listening to the futures threads pass job information to be saved to this function. The second helper function here is intended to be called whenever a tuning job completes. It extracts the metrics as the return value of the future and writes a json line to file. It also prints out any exceptions generated by the future without raising an error to allow the remainder of jobs to complete. This prevents a single job failure from preventing any future analyses. ***

[ ]:
def append_tuning_job_to_file(
    job_information: JobInformation, file_path: Path = SAVE_TUNING_JOB_NAMES_FILE_PATH
) -> None:
    with open(file_path, "a+") as file:
        file.write(f"{json.dumps(job_information._asdict())}\n")
    print(f"> Saved job information to file, {job_information}")


def append_metrics_to_file(
    future: cf.Future,
    job_information: JobInformation,
    file_path: Path = SAVE_METRICS_FILE_PATH,
) -> None:
    try:
        metrics = future.result()
        with open(file_path, "a+") as file:
            file.write(f"{json.dumps(metrics)}\n")
        print(f"> Saved metrics to file, {job_information}")
    except Exception as exc:
        print(f"> Exception generated for {job_information}: {exc}")
        traceback.print_exc()

4. Asynchronously execute hyperparameter tuning jobs


Everything is now in place to launch training jobs and aggregate performance metrics for the benchmarking evaluation. This notebook makes use of the Python standard library’s concurrent futures module, which is a high-level interface for asynchronously executing callable functions. The run_tuner function will be repetitively executed on a thread pool and the queue_currently_running queue will block any threads from launching additional training instances until the number of currently running tuning jobs is less than SM_AMT_MAX_PARALLEL_TUNING_JOBS. Note that this queue would not be necessary if a ProcessPoolExecutor was used in place of ThreadPoolExecutor, but a process pool cannot share global state and therefore calling the functions append_tuning_job_to_file and append_metrics_to_file would not be thread safe.

Once all jobs are submitted to the executor, this script listens to the futures job pool. Until all jobs are completed, it will perform two tasks: 1) call append_tuning_job_to_file with any job information that gets populated into queue_save_tuning_job, and 2) call append_metrics_to_file for any future that has finished execution.

FINAL NOTE: Depending on the number of models and datasets defined above, this block may take a long time to run and consume many resources. Please double-check your settings! ***

[ ]:
if SAVE_METRICS_FILE_PATH.exists():
    SAVE_METRICS_FILE_PATH.unlink()

queue_save_tuning_job = queue.Queue()
queue_currently_running = queue.Queue(maxsize=SM_AMT_MAX_PARALLEL_TUNING_JOBS)

jobs = itertools.product(MODELS, DATASETS)

with cf.ThreadPoolExecutor(max_workers=SM_AMT_MAX_PARALLEL_TUNING_JOBS) as executor:
    futures_to_job_information = {}
    for unique_id, (model_name, dataset_name) in enumerate(jobs):
        dataset = DATASET_DICT[dataset_name]
        future = executor.submit(
            run_tuner,
            model_name,
            dataset,
            unique_id,
            queue_currently_running,
            queue_save_tuning_job,
        )
        futures_to_job_information[future] = JobInformation(model_name, dataset_name)

    while futures_to_job_information:
        done, not_done = cf.wait(
            futures_to_job_information, timeout=5.0, return_when=cf.FIRST_COMPLETED
        )

        while not queue_save_tuning_job.empty():
            job_information = queue_save_tuning_job.get()
            append_tuning_job_to_file(job_information)

        for future in done:
            queue_currently_running.get()
            job_information_before_execution = futures_to_job_information.pop(future)
            append_metrics_to_file(future, job_information_before_execution)

5. Analyze results


At this point, all tuning jobs should have completed execution. Congratulations! Please check the file SAVE_METRICS_FILE_PATH to see that each job should have appended a JSON object to a new row in the file. You can read the contents of this file into a Pandas DataFrame to view results in tabular form. The following block reads the saved metrics into a DataFrame and then cleans the model_name column and adds a model_category column indicating the model architecture.

Packages are re-imported in this section so you can perform analyses at a later time or in a different session given the saved JSON Lines file without re-running this whole notebook. You may need to manually re-define the constant SAVE_METRICS_FILE_PATH here. ***

[ ]:
import pandas as pd


metrics_df = pd.read_json(SAVE_METRICS_FILE_PATH, lines=True)

print("Available columns in metrics_df are as follows:", metrics_df.columns.tolist())
[ ]:
def model_name_clean(model_name: str):
    model_name = model_name.replace("tensorflow-ic-", "")
    model_name = model_name.replace("imagenet-", "")
    model_name = model_name.split("-classification")[0]
    return model_name


metrics_df["model_name"] = metrics_df["model_name"].apply(model_name_clean)
metrics_df["model_category"] = metrics_df["model_name"].apply(
    lambda x: x.replace("tf2-preview-", "").split("-")[0]
)

index_columns = ["dataset_name", "model_category", "model_name"]
display_columns = [name for name in metrics_df.columns if "test_" in name]
display(metrics_df.sort_values(by=index_columns).set_index(index_columns)[display_columns])

With a Pandas DataFrame of all performance metrics populated, you can easily create tables of interest. For example, here the table is pivoted to display models as rows and performance for different datasets as columns. ***

[ ]:
pd.set_option("display.max_rows", 500)
index_columns = ["model_category", "model_name"]
columns = ["dataset_name"]
value_columns = ["test_accuracy", "test_evaluation_latency", "train_duration"]
display(metrics_df.pivot(index=index_columns, columns=columns, values=value_columns).round(3))

Finally, you can create a plotly figure illustrating the Pareto front tradeoff between validation accuracy and throughput. If using Jupyter Lab, be sure to enable the plotly Jupyter extension for best viewing results. ***

[ ]:
import pandas as pd
import numpy as np
import plotly.express as px
import plotly.graph_objs as go


def benchmarking_figure(
    df: pd.DataFrame,
    dataset_name: str,
    x: str = "test_throughput",
    y: str = "test_accuracy",
    title: str = "SageMaker JumpStart TensorFlow Image Classification Benchmarking",
    model_name: str = "model_name",
    xaxis_title: str = "throughput (images per second)",
    yaxis_title: str = "test accuracy",
    size: str = "num_params",
    color: str = "model_category",
    width: int = 800,
    height: int = 600,
) -> go.Figure:

    df[f"sqrt_{size}"] = np.sqrt(df[size])
    df = df.sort_values(by=[model_name])
    df = df[df["dataset_name"] == dataset_name]

    fig = px.scatter(
        df,
        x=x,
        y=y,
        color=color,
        size=f"sqrt_{size}",
        title=f"{title} ({dataset_name})",
        hover_name=model_name,
        log_x=True,
        width=width,
        height=height,
    )
    fig.update_layout(
        xaxis_title=xaxis_title,
        yaxis_title=yaxis_title,
    )
    return fig
[ ]:
for dataset_name in metrics_df["dataset_name"].unique():
    fig = benchmarking_figure(metrics_df, dataset_name)
    fig.write_html(f"jumpstart_tf_ic_benchmarking_pareto_{dataset_name}.html")
    fig.show()

Notebook CI Test Results

This notebook was tested in multiple regions. The test results are as follows, except for us-west-2 which is shown at the top of the notebook.

This us-east-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This us-east-2 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This us-west-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This ca-central-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This sa-east-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This eu-west-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This eu-west-2 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This eu-west-3 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This eu-central-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This eu-north-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This ap-southeast-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This ap-southeast-2 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This ap-northeast-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This ap-northeast-2 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This ap-south-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable