Computer Vision for Medical Imaging

This notebook showcases techniques and services offer by SageMaker to build a model which predicts if an image of cells contains cancer. This notebook shows how to build a model using hyperparameter tuning.

Dataset

The dataset for this demo comes from the Camelyon16 Challenge made available under the CC0 licencse. The raw data provided by the challenge has been processed into 96x96 pixel tiles by Bas Veeling and also made available under the CC0 license. For detailed information on each dataset please see the papers below: * Ehteshami Bejnordi et al. Diagnostic Assessment of Deep Learning Algorithms for Detection of Lymph Node Metastases in Women With Breast Cancer. JAMA: The Journal of the American Medical Association, 318(22), 2199–2210. doi:jama.2017.14585 * B. S. Veeling, J. Linmans, J. Winkens, T. Cohen, M. Welling. “Rotation Equivariant CNNs for Digital Pathology”. arXiv:1806.03962

The tiled dataset from Bas Veeling is over 6GB of data. In order to easily run this demo, the dataset has been pruned to the first 14,000 images of the tiled dataset and comes included in the repo with this notebook for convenience.

Update Sagemaker SDK and Boto3

NOTE You may get an error from pip’s dependency resolver; you can ignore this error.

[ ]:
import pip


def import_or_install(package):
    try:
        __import__(package)
    except ImportError:
        ! pip install $package


required_packages = ["sagemaker", "boto3", "h5py", "tqdm", "matplotlib", "opencv-python"]

for package in required_packages:
    import_or_install(package)

Import Libraries

[ ]:
import io
import os
import h5py
import zipfile
import boto3
import sagemaker
import mxnet as mx
import numpy as np
from tqdm import tqdm
import matplotlib.pyplot as plt
import cv2
from datetime import datetime

from inference_specification import InferenceSpecification

Configure Boto3 Clients and Sessions

[ ]:
region = "us-west-2"  # Change region as needed
boto3.setup_default_session(region_name=region)
boto_session = boto3.Session(region_name=region)

s3_client = boto3.client("s3", region_name=region)

sagemaker_boto_client = boto_session.client("sagemaker")
sagemaker_session = sagemaker.session.Session(
    boto_session=boto_session, sagemaker_client=sagemaker_boto_client
)
sagemaker_role = sagemaker.get_execution_role()

bucket = sagemaker.Session().default_bucket()

Part 1: Prepare Dataset

Load Dataset

[ ]:
# check if directory exists
if not os.path.isdir("data"):
    os.mkdir("data")

# download zip file from public s3 bucket
!wget -P data https://sagemaker-sample-files.s3.amazonaws.com/datasets/image/pcam/medical_images.zip
[ ]:
with zipfile.ZipFile("data/medical_images.zip") as zf:
    zf.extractall()
with open("data/camelyon16_tiles.h5", "rb") as hf:
    f = h5py.File(hf, "r")

    X = f["x"][()]
    y = f["y"][()]

print("Shape of X:", X.shape)
print("Shape of y:", y.shape)
[ ]:
# write to session s3 bucket
s3_client.upload_file("data/medical_images.zip", bucket, f"data/medical_images.zip")
[ ]:
# delete local copy
import os

if os.path.exists("data/medical_images.zip"):
    os.remove("data/medical_images.zip")
else:
    print("The file does not exist")

View Sample Images from Dataset

[ ]:
def preview_images(X, y, n, cols):
    sample_images = X[:n]
    sample_labels = y[:n]

    rows = int(np.ceil(n / cols))
    fig, axs = plt.subplots(rows, cols, figsize=(11.5, 7))

    for i, ax in enumerate(axs.flatten()):
        image = sample_images[i]
        label = sample_labels[i]
        ax.imshow(image)
        ax.axis("off")
        ax.set_title(f"Label: {label}")

    plt.tight_layout()


preview_images(X, y, 15, 5)

Shuffle and Split Dataset

[ ]:
from sklearn.model_selection import train_test_split

X_numpy = X[:]
y_numpy = y[:]

X_train, X_test, y_train, y_test = train_test_split(
    X_numpy, y_numpy, test_size=1000, random_state=0
)
X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=2000, random_state=1)

print(X_train.shape)
print(X_val.shape)
print(X_test.shape)

Convert Splits to RecordIO Format

[ ]:
def write_to_recordio(X: np.ndarray, y: np.ndarray, prefix: str):
    record = mx.recordio.MXIndexedRecordIO(idx_path=f"{prefix}.idx", uri=f"{prefix}.rec", flag="w")
    for idx, arr in enumerate(tqdm(X)):
        header = mx.recordio.IRHeader(0, y[idx], idx, 0)
        s = mx.recordio.pack_img(
            header,
            arr,
            quality=95,
            img_fmt=".jpg",
        )
        record.write_idx(idx, s)
    record.close()
[ ]:
write_to_recordio(X_train, y_train, prefix="data/train")
write_to_recordio(X_val, y_val, prefix="data/val")
write_to_recordio(X_test, y_test, prefix="data/test")

Upload Data Splits to S3

[ ]:
prefix = "cv-metastasis-{}".format(datetime.now().strftime("%Y-%m-%d-%H-%M-%S"))

try:
    s3_client.create_bucket(
        Bucket=bucket, ACL="private", CreateBucketConfiguration={"LocationConstraint": region}
    )
    print(f"Created S3 bucket: {bucket}")

except Exception as e:
    if e.response["Error"]["Code"] == "BucketAlreadyOwnedByYou":
        print(f"Using existing bucket: {bucket}")
    else:
        raise (e)
[ ]:
s3_client.upload_file("data/train.rec", bucket, f"{prefix}/data/train/train.rec")
s3_client.upload_file("data/val.rec", bucket, f"{prefix}/data/val/val.rec")
s3_client.upload_file("data/test.rec", bucket, f"{prefix}/data/test/test.rec")

Part 2: Training the Model

Configure the Estimator

[ ]:
training_image = sagemaker.image_uris.retrieve("image-classification", region)
num_training_samples = X_train.shape[0]
num_classes = len(np.unique(y_train))

hyperparameters = {
    "num_layers": 18,
    "use_pretrained_model": 1,
    "augmentation_type": "crop_color_transform",
    "image_shape": "3,96,96",
    "num_classes": num_classes,
    "num_training_samples": num_training_samples,
    "mini_batch_size": 64,
    "epochs": 5,
    "learning_rate": 0.01,
    "precision_dtype": "float32",
}

estimator_config = {
    "hyperparameters": hyperparameters,
    "image_uri": training_image,
    "role": sagemaker.get_execution_role(),
    "instance_count": 1,
    "instance_type": "ml.p3.2xlarge",
    "volume_size": 100,
    "max_run": 360000,
    "output_path": f"s3://{bucket}/{prefix}/training_jobs",
}

image_classifier = sagemaker.estimator.Estimator(**estimator_config)

Configure the Hyperparameter Tuner

Although we would prefer to tune for recall, the current HyperparameterTuner implementation for Image Classification only supports validation accuracy.

[ ]:
hyperparameter_ranges = {
    "mini_batch_size": sagemaker.parameter.CategoricalParameter([16, 32, 64]),
    "learning_rate": sagemaker.parameter.CategoricalParameter([0.001, 0.01]),
}

hyperparameter_tuner = sagemaker.tuner.HyperparameterTuner(
    estimator=image_classifier,
    objective_metric_name="validation:accuracy",
    hyperparameter_ranges=hyperparameter_ranges,
    max_jobs=6,
    max_parallel_jobs=2,
    base_tuning_job_name=prefix,
)

Define the Data Channels

[ ]:
train_input = sagemaker.inputs.TrainingInput(
    s3_data=f"s3://{bucket}/{prefix}/data/train",
    content_type="application/x-recordio",
    s3_data_type="S3Prefix",
    input_mode="Pipe",
)

val_input = sagemaker.inputs.TrainingInput(
    s3_data=f"s3://{bucket}/{prefix}/data/val",
    content_type="application/x-recordio",
    s3_data_type="S3Prefix",
    input_mode="Pipe",
)

data_channels = {"train": train_input, "validation": val_input}

Run Hyperparameter Tuning Jobs

[ ]:
if "tuning_job_name" not in locals():
    hyperparameter_tuner.fit(inputs=data_channels)
    tuning_job_name = hyperparameter_tuner.describe().get("HyperParameterTuningJobName")
else:
    print(f"Using previous tuning job: {tuning_job_name}")

Examine Results

NOTE: If your kernel has restarted after running the hyperparameter tuning job, everyting you need has been persisted to SageMaker. You can continue on without having to run the tuning job again.

[ ]:
results = sagemaker.analytics.HyperparameterTuningJobAnalytics(tuning_job_name)
results_df = results.dataframe()
results_df
[ ]:
best_training_job_summary = results.description()["BestTrainingJob"]
best_training_job_name = best_training_job_summary["TrainingJobName"]

%store best_training_job_name

Part 3: Retrieving and Saving the Model in SageMaker Lineage and SageMaker Model Registry

Examine Lineage

Though you already know the training job details from running the cells above, if we were just given the model uri, we could use SageMaker Lineage to retrieve the training job details which produced the model.

Data Lineage and Metrics for Best Model

[ ]:
from sagemaker.lineage import context, artifact, association, action

Training data artifact

[ ]:
results = sagemaker.analytics.HyperparameterTuningJobAnalytics(tuning_job_name)
results_df = results.dataframe()
best_training_job_summary = results.description()["BestTrainingJob"]
best_training_job_details = sagemaker_boto_client.describe_training_job(
    TrainingJobName=best_training_job_name
)
[ ]:
data_artifact_list = []
for data_input in best_training_job_details["InputDataConfig"]:
    channel = data_input["ChannelName"]
    data_s3_uri = data_input["DataSource"]["S3DataSource"]["S3Uri"]

    matching_artifacts = list(
        artifact.Artifact.list(source_uri=data_s3_uri, sagemaker_session=sagemaker_session)
    )

    if matching_artifacts:
        data_artifact = matching_artifacts[0]
        print(f"Using existing artifact: {data_artifact.artifact_arn}")
    else:
        data_artifact = artifact.Artifact.create(
            artifact_name=channel,
            source_uri=data_s3_uri,
            artifact_type="DataSet",
            sagemaker_session=sagemaker_session,
        )
        print(f"Create artifact {data_artifact.artifact_arn}: SUCCESSFUL")
    data_artifact_list.append(data_artifact)

Model artifact

[ ]:
trained_model_s3_uri = best_training_job_details["ModelArtifacts"]["S3ModelArtifacts"]

matching_artifacts = list(
    artifact.Artifact.list(source_uri=trained_model_s3_uri, sagemaker_session=sagemaker_session)
)

if matching_artifacts:
    model_artifact = matching_artifacts[0]
    print(f"Using existing artifact: {model_artifact.artifact_arn}")
else:
    model_artifact = artifact.Artifact.create(
        artifact_name="TrainedModel",
        source_uri=trained_model_s3_uri,
        artifact_type="Model",
        sagemaker_session=sagemaker_session,
    )
    print(f"Create artifact {model_artifact.artifact_arn}: SUCCESSFUL")

Set artifact associations

[ ]:
trial_component = sagemaker_boto_client.describe_trial_component(
    TrialComponentName=best_training_job_summary["TrainingJobName"] + "-aws-training-job"
)
trial_component_arn = trial_component["TrialComponentArn"]

Store artifacts

[ ]:
artifact_list = data_artifact_list + [model_artifact]

for artif in artifact_list:
    if artif.artifact_type == "DataSet":
        assoc = "ContributedTo"
    else:
        assoc = "Produced"
    try:
        association.Association.create(
            source_arn=artif.artifact_arn,
            destination_arn=trial_component_arn,
            association_type=assoc,
            sagemaker_session=sagemaker_session,
        )
        print(f"Association with {artif.artifact_type}: SUCCESSFUL")
    except:
        print(f"Association already exists with {artif.artifact_type}")

Model Registry

You can also save your model in the model registry, which you can use to check and retrieve your model in the future

[ ]:
mpg_name = prefix

model_packages = sagemaker_boto_client.list_model_packages(ModelPackageGroupName=mpg_name)[
    "ModelPackageSummaryList"
]

if model_packages:
    print(f"Using existing Model Package Group: {mpg_name}")
else:
    mpg_input_dict = {
        "ModelPackageGroupName": mpg_name,
        "ModelPackageGroupDescription": "Cancer metastasis detection",
    }

    mpg_response = sagemaker_boto_client.create_model_package_group(**mpg_input_dict)
    print(f"Create Model Package Group {mpg_name}: SUCCESSFUL")
[ ]:
training_jobs = results_df["TrainingJobName"]

for job_name in training_jobs:
    job_data = sagemaker_boto_client.describe_training_job(TrainingJobName=job_name)
    model_uri = job_data.get("ModelArtifacts", {}).get("S3ModelArtifacts")
    training_image = job_data["AlgorithmSpecification"]["TrainingImage"]

    mp_inference_spec = InferenceSpecification().get_inference_specification_dict(
        ecr_image=training_image,
        supports_gpu=False,
        supported_content_types=["text/csv"],
        supported_mime_types=["text/csv"],
    )

    mp_inference_spec["InferenceSpecification"]["Containers"][0]["ModelDataUrl"] = model_uri
    mp_input_dict = {
        "ModelPackageGroupName": mpg_name,
        "ModelPackageDescription": "SageMaker Image Classifier",
        "ModelApprovalStatus": "PendingManualApproval",
    }

    mp_input_dict.update(mp_inference_spec)
    mp_response = sagemaker_boto_client.create_model_package(**mp_input_dict)

model_packages = sagemaker_boto_client.list_model_packages(
    ModelPackageGroupName=mpg_name, MaxResults=6
)["ModelPackageSummaryList"]
model_packages

Part 4: Deploying the Model

Create Model from Existing Training Job Name for Deployment

We can use the name of the best training job from our hyperparameter tuning experiment and create its corresponding model.

[ ]:
model_name = "metastasis-detection-{}".format(datetime.now().strftime("%Y-%m-%d-%H-%M-%S"))
model_matches = sagemaker_boto_client.list_models(NameContains=model_name)["Models"]
training_image = sagemaker.image_uris.retrieve("image-classification", region)

if not model_matches:
    print(f"Creating model {model_name}")
    sagemaker_session.create_model_from_job(
        name=model_name,
        training_job_name=best_training_job_summary["TrainingJobName"],
        role=sagemaker_role,
        image_uri=training_image,
    )
else:
    print(f"Model {model_name} already exists.")

Deploy Model using Data from Model Registry

As we saved data about model in the Model Resgistry, we can look up details about the model and use them to deploy the model.

[ ]:
training_jobs = results_df["TrainingJobName"]
best_model_index = np.where(training_jobs.values == best_training_job_summary["TrainingJobName"])[
    0
][0]
best_model_info = sagemaker_boto_client.describe_model_package(
    ModelPackageName=model_packages[best_model_index]["ModelPackageArn"]
)
best_model_container = best_model_info.get("InferenceSpecification").get("Containers")[0]
deploy_instance_type = best_model_info.get("InferenceSpecification").get(
    "SupportedRealtimeInferenceInstanceTypes"
)[0]

best_model = sagemaker.Model(
    image_uri=best_model_container.get("Image"),
    model_data=best_model_container.get("ModelDataUrl"),
    role=sagemaker.get_execution_role(),
    name=mpg_name,
)

best_model.deploy(
    initial_instance_count=1, instance_type=deploy_instance_type, endpoint_name=mpg_name
)

Inference

Finally, the we can now validate the model for use. You can obtain the endpoint from the client library using the result from previous operations, and generate classifications from the trained model using that endpoint.

[ ]:
from sklearn.model_selection import train_test_split

with h5py.File("data/camelyon16_tiles.h5", "r") as hf:
    X = hf["x"][()]
    y = hf["y"][()]

X_numpy = X[:]
y_numpy = y[:]

X_train, X_test, y_train, y_test = train_test_split(
    X_numpy, y_numpy, test_size=1000, random_state=0
)
[ ]:
# view test image
image = X_test[0]
label = y_test[0]
plt.imshow(image)
plt.axis("off")
plt.title(f"Label: {label}");
[ ]:
from PIL import Image

img = Image.fromarray(X_test[0])
file_name = "data/test_image.jpg"
img.save(file_name)
[ ]:
import json

runtime = boto3.Session().client(service_name="runtime.sagemaker")
with open(file_name, "rb") as f:
    payload = f.read()
    payload = bytearray(payload)

response = runtime.invoke_endpoint(
    EndpointName=mpg_name, ContentType="application/x-image", Body=payload
)

result = response["Body"].read()

# result will be in json format and convert it to ndarray
result = json.loads(result)
print(result)
[ ]:
# the result will output the probabilities for all classes
# find the class with maximum probability and print the class index
index = np.argmax(result)
index
[ ]:
predictions = []
for i in range(len(X_test)):
    img = Image.fromarray(X_test[i])
    file_name = f"/tmp/test_image.jpg"
    img.save(file_name)

    with open(file_name, "rb") as f:
        payload = f.read()
        payload = bytearray(payload)

    response = runtime.invoke_endpoint(
        EndpointName=mpg_name, ContentType="application/x-image", Body=payload
    )

    result = response["Body"].read()
    result = json.loads(result)
    index = np.argmax(result)
    predictions.append(index)
[ ]:
from sklearn.metrics import precision_recall_fscore_support

precision, recall, f1, _ = precision_recall_fscore_support(y_test, predictions)
print(f"Precision = {precision[1]}")
print(f"Recall = {recall[1]}")
print(f"F1-Score = {f1[1]}")

Part 5: Clean up resources

[ ]:
best_model.sagemaker_session.delete_endpoint(mpg_name)
[ ]: