Computer Vision for Medical Imaging - Pipeline Mode

This notebook showcases techniques and services offer by SageMaker to build a model which predicts if an image of cells contains cancer. This notebook describes how to automate the ML workflow using SageMaker Pipelines.

Dataset

The dataset for this demo comes from the Camelyon16 Challenge made available under the CC0 licencse. The raw data provided by the challenge has been processed into 96x96 pixel tiles by Bas Veeling and also made available under the CC0 license. For detailed information on each dataset please see the papers below: * Ehteshami Bejnordi et al. Diagnostic Assessment of Deep Learning Algorithms for Detection of Lymph Node Metastases in Women With Breast Cancer. JAMA: The Journal of the American Medical Association, 318(22), 2199–2210. doi:jama.2017.14585 * B. S. Veeling, J. Linmans, J. Winkens, T. Cohen, M. Welling. “Rotation Equivariant CNNs for Digital Pathology”. arXiv:1806.03962

The tiled dataset from Bas Veeling is over 6GB of data. In order to easily run this demo, the dataset has been pruned to the first 14,000 images of the tiled dataset and comes included in the repo with this notebook for convenience.

Update Sagemaker SDK and Boto3

NOTE You may get an error from pip’s dependency resolver; you can ignore this error.

[ ]:
! pip install --upgrade sagemaker boto3

Import Libraries

[ ]:
import pip


def import_or_install(package):
    try:
        __import__(package)
    except ImportError:
        ! pip install $package


required_packages = ["sagemaker", "boto3", "h5py", "tqdm", "matplotlib", "opencv-python"]

for package in required_packages:
    import_or_install(package)
[ ]:
import boto3
import sagemaker
import numpy as np
import matplotlib.pyplot as plt
import cv2
import os
import zipfile
import h5py
import mxnet as mx
from datetime import datetime
from tqdm import tqdm

from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import CreateModelStep
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.workflow.step_collections import RegisterModel
from sagemaker.workflow.steps import ProcessingStep, TrainingStep
from sagemaker.workflow.parameters import ParameterInteger, ParameterFloat, ParameterString

Configure Boto3 Clients and Sessions

[ ]:
region = "us-west-2"  # Change region as needed
boto3.setup_default_session(region_name=region)
boto_session = boto3.Session(region_name=region)

s3_client = boto3.client("s3", region_name=region)

sagemaker_boto_client = boto_session.client("sagemaker")
sagemaker_session = sagemaker.session.Session(
    boto_session=boto_session, sagemaker_client=sagemaker_boto_client
)
sagemaker_role = sagemaker.get_execution_role()

bucket = sagemaker.Session().default_bucket()

Load Dataset

[ ]:
# check if directory exists
if not os.path.isdir("data"):
    os.mkdir("data")

# download zip file from public s3 bucket
!wget -P data https://sagemaker-sample-files.s3.amazonaws.com/datasets/image/pcam/medical_images.zip
[ ]:
with zipfile.ZipFile("data/medical_images.zip") as zf:
    zf.extractall()
with open("data/camelyon16_tiles.h5", "rb") as hf:
    f = h5py.File(hf, "r")

    X = f["x"][()]
    y = f["y"][()]

print("Shape of X:", X.shape)
print("Shape of y:", y.shape)
[ ]:
# write to session s3 bucket
s3_client.upload_file("data/medical_images.zip", bucket, f"data/medical_images.zip")
[ ]:
# delete local copy
import os

if os.path.exists("data/medical_images.zip"):
    os.remove("data/medical_images.zip")
else:
    print("The file does not exist")

View Sample Images from Dataset

[ ]:
def preview_images(X, y, n, cols):
    sample_images = X[:n]
    sample_labels = y[:n]

    rows = int(np.ceil(n / cols))
    fig, axs = plt.subplots(rows, cols, figsize=(11.5, 7))

    for i, ax in enumerate(axs.flatten()):
        image = sample_images[i]
        label = sample_labels[i]
        ax.imshow(image)
        ax.axis("off")
        ax.set_title(f"Label: {label}")

    plt.tight_layout()


preview_images(X, y, 15, 5)

Shuffle and Split Dataset

[ ]:
from sklearn.model_selection import train_test_split

X_numpy = X[:]
y_numpy = y[:]

X_train, X_test, y_train, y_test = train_test_split(
    X_numpy, y_numpy, test_size=1000, random_state=0
)
X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=2000, random_state=1)

print(X_train.shape)
print(X_val.shape)
print(X_test.shape)

Convert Splits to RecordIO Format

[ ]:
def write_to_recordio(X: np.ndarray, y: np.ndarray, prefix: str):
    record = mx.recordio.MXIndexedRecordIO(idx_path=f"{prefix}.idx", uri=f"{prefix}.rec", flag="w")
    for idx, arr in enumerate(tqdm(X)):
        header = mx.recordio.IRHeader(0, y[idx], idx, 0)
        s = mx.recordio.pack_img(
            header,
            arr,
            quality=95,
            img_fmt=".jpg",
        )
        record.write_idx(idx, s)
    record.close()
[ ]:
write_to_recordio(X_train, y_train, prefix="data/train")
write_to_recordio(X_val, y_val, prefix="data/val")
write_to_recordio(X_test, y_test, prefix="data/test")

Upload Data Splits to S3

[ ]:
prefix = "cv-metastasis"

try:
    s3_client.create_bucket(
        Bucket=bucket, ACL="private", CreateBucketConfiguration={"LocationConstraint": region}
    )
    print(f"Created S3 bucket: {bucket}")

except Exception as e:
    if e.response["Error"]["Code"] == "BucketAlreadyOwnedByYou":
        print(f"Using existing bucket: {bucket}")
    else:
        raise (e)
[ ]:
s3_client.upload_file("data/train.rec", bucket, f"{prefix}/data/train/train.rec")
s3_client.upload_file("data/val.rec", bucket, f"{prefix}/data/val/val.rec")
s3_client.upload_file("data/test.rec", bucket, f"{prefix}/data/test/test.rec")

Configure the Estimator

[ ]:
training_image = sagemaker.image_uris.retrieve("image-classification", region)
num_training_samples = X_train.shape[0]
num_classes = len(np.unique(y_train))

hyperparameters = {
    "num_layers": 18,
    "use_pretrained_model": 1,
    "augmentation_type": "crop_color_transform",
    "image_shape": "3,96,96",
    "num_classes": num_classes,
    "num_training_samples": num_training_samples,
    "mini_batch_size": 64,
    "epochs": 5,
    "learning_rate": 0.01,
    "precision_dtype": "float32",
}

estimator_config = {
    "hyperparameters": hyperparameters,
    "image_uri": training_image,
    "role": sagemaker.get_execution_role(),
    "instance_count": 1,
    "instance_type": "ml.p3.2xlarge",
    "volume_size": 100,
    "max_run": 360000,
    "output_path": f"s3://{bucket}/{prefix}/training_jobs",
}

image_classifier = sagemaker.estimator.Estimator(**estimator_config)

Pipeline

Step 1: Create RecordIO Splits

[ ]:
base_uri = f"s3://{bucket}/{prefix}/data"
input_data_uri = sagemaker.s3.S3Uploader.upload(
    local_path="data/camelyon16_tiles.h5", desired_s3_uri=base_uri
)

input_data = ParameterString(name="InputData", default_value=input_data_uri)
[ ]:
s3_client.upload_file(Filename="split_data.py", Bucket=bucket, Key=f"{prefix}/code/split_data.py")
split_data_script_uri = f"s3://{bucket}/{prefix}/code/split_data.py"
split_data_instance_type = "ml.t3.large"

sklearn_processor = SKLearnProcessor(
    framework_version="1.0-1",
    instance_type=split_data_instance_type,
    instance_count=1,
    base_job_name="image-classication-split-data",
    role=sagemaker_role,
)

split_data_step = ProcessingStep(
    name="SplitData",
    processor=sklearn_processor,
    inputs=[
        sagemaker.processing.ProcessingInput(
            source=input_data, destination="/opt/ml/processing/input"
        ),
    ],
    outputs=[
        sagemaker.processing.ProcessingOutput(
            output_name="train_data", source="/opt/ml/processing/output/data/train"
        ),
        sagemaker.processing.ProcessingOutput(
            output_name="val_data", source="/opt/ml/processing/output/data/val"
        ),
        sagemaker.processing.ProcessingOutput(
            output_name="test_data", source="/opt/ml/processing/output/data/test"
        ),
    ],
    code=split_data_script_uri,
)

Step 2: Train Model

[ ]:
train_step_inputs = {
    "train": sagemaker.inputs.TrainingInput(
        s3_data=split_data_step.properties.ProcessingOutputConfig.Outputs[
            "train_data"
        ].S3Output.S3Uri,
        content_type="application/x-recordio",
        s3_data_type="S3Prefix",
        input_mode="Pipe",
    ),
    "validation": sagemaker.inputs.TrainingInput(
        s3_data=split_data_step.properties.ProcessingOutputConfig.Outputs[
            "val_data"
        ].S3Output.S3Uri,
        content_type="application/x-recordio",
        s3_data_type="S3Prefix",
        input_mode="Pipe",
    ),
}

train_step = TrainingStep(name="TrainModel", estimator=image_classifier, inputs=train_step_inputs)

Step 3: Register Model

[ ]:
mpg_name = "cv-metastasis-{}".format(datetime.now().strftime("%Y-%m-%d-%H-%M-%S"))

model_approval_status = ParameterString(
    name="ModelApprovalStatus", default_value="PendingManualApproval"
)

register_step = RegisterModel(
    name="RegisterModel",
    estimator=image_classifier,
    model_data=train_step.properties.ModelArtifacts.S3ModelArtifacts,
    content_types=["image/jpeg"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=mpg_name,
    approval_status=model_approval_status,
)

Step 4: Create Model

[ ]:
model = sagemaker.model.Model(
    name=f"{mpg_name}-pipline",
    image_uri=training_image,
    model_data=train_step.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=sagemaker_session,
    role=sagemaker_role,
)

inputs = sagemaker.inputs.CreateModelInput(instance_type="ml.m4.xlarge")

create_model_step = CreateModelStep(name="ModelPreDeployment", model=model, inputs=inputs)

Step 5: Deploy Model

[ ]:
s3_client.upload_file(
    Filename="deploy_model.py", Bucket=bucket, Key=f"{prefix}/code/deploy_model.py"
)
deploy_model_script_uri = f"s3://{bucket}/{prefix}/code/deploy_model.py"
deploy_instance_type = "ml.m4.xlarge"

deploy_model_processor = SKLearnProcessor(
    framework_version="1.0-1",
    role=sagemaker_role,
    instance_type="ml.t3.medium",
    instance_count=1,
    base_job_name=f"{prefix}-deploy-model",
    sagemaker_session=sagemaker_session,
)

deploy_step = ProcessingStep(
    name="DeployModel",
    processor=deploy_model_processor,
    job_arguments=[
        "--model-name",
        create_model_step.properties.ModelName,
        "--region",
        region,
        "--endpoint-instance-type",
        deploy_instance_type,
        "--endpoint-name",
        "cv-model-pipeline",
    ],
    code=deploy_model_script_uri,
)

Create Pipeline

[ ]:
pipeline_name = "{}-pipeline-{}".format(prefix, datetime.now().strftime("%Y-%m-%d-%H-%M-%S"))

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[input_data, model_approval_status],
    steps=[split_data_step, train_step, register_step, create_model_step, deploy_step],
)

pipeline.upsert(role_arn=sagemaker_role)
[ ]:
parameters = {"ModelApprovalStatus": "Approved"}

start_response = pipeline.start(parameters=parameters)
start_response.wait(max_attempts=100)
start_response.describe()

Lineage

Review the lineage of the artifacts generated by the pipeline.

[ ]:
import time
from sagemaker.lineage.visualizer import LineageTableVisualizer
from pprint import pprint


viz = LineageTableVisualizer(sagemaker_session)
for execution_step in reversed(start_response.list_steps()):
    pprint(execution_step)
    display(viz.show(pipeline_execution_step=execution_step))
    time.sleep(5)

Clean up resources

[ ]:
def delete_model_package_group(sm_client, package_group_name):
    try:
        model_versions = sm_client.list_model_packages(ModelPackageGroupName=package_group_name)

    except Exception as e:
        print("{} \n".format(e))
        return

    for model_version in model_versions["ModelPackageSummaryList"]:
        try:
            sm_client.delete_model_package(ModelPackageName=model_version["ModelPackageArn"])
        except Exception as e:
            print("{} \n".format(e))
        time.sleep(0.5)  # Ensure requests aren't throttled

    try:
        sm_client.delete_model_package_group(ModelPackageGroupName=package_group_name)
        print("{} model package group deleted".format(package_group_name))
    except Exception as e:
        print("{} \n".format(e))
    return


def delete_sagemaker_pipeline(sm_client, pipeline_name):
    try:
        sm_client.delete_pipeline(
            PipelineName=pipeline_name,
        )
        print("{} pipeline deleted".format(pipeline_name))
    except Exception as e:
        print("{} \n".format(e))
        return
[ ]:
client = sagemaker.Session().sagemaker_client
delete_model_package_group(client, mpg_name)
delete_sagemaker_pipeline(client, pipeline_name)
[ ]: