Move Amazon SageMaker Autopilot ML models from experimentation to production using Amazon SageMaker Pipelines


This notebook’s CI test result for us-west-2 is as follows. CI test results in other regions can be found at the end of the notebook.

This us-west-2 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable


Amazon SageMaker Autopilot automatically builds, trains, and tunes the best custom machine learning (ML) models based on your data. It’s an automated machine learning (AutoML) solution that eliminates the heavy lifting of handwritten ML models that requires ML expertise. Data scientists need to only provide a tabular dataset and select the target column to predict, and Autopilot automatically infers the problem type, performs data preprocessing and feature engineering, selects the algorithms and training mode, and explores different configurations to find the best ML model. Then you can directly deploy the model to an Amazon SageMaker endpoint or iterate on the recommended solutions to further improve the model quality.

Although Autopilot eliminates the heavy lifting of building ML models, MLOps engineers still have to create, automate, and manage end-to-end ML workflows. SageMaker Pipelines helps you automate the different steps of the ML lifecycle, including data preprocessing, training, tuning and evaluating ML models, and deploying them.

This notebook demonstrates how to leverage SageMaker Autopilot as part of a SageMaker Pipelines end-to-end AutoML training workflow. This notebook has successfully been run using SageMaker Studio with the Amazon Linux 2, Jupyter Lab 3 platform identifier. When running this notebook with older versions of SageMaker Studio or a SageMaker Notebook Instance, the boto3 and/or sagemaker packages might need to be upgraded.

Alternatively, when using SageMaker Autopilot with Ensembling Mode, you may also refer to the notebook example on how to useSageMaker Pipeline’s native AutoML stepinstead.

Imports

[ ]:
# Install Reinvent Wheels
! aws s3 cp s3://reinvent-rc-wheels/2022/dist/sagemaker.tar.gz /tmp/ --region us-west-2
! pip install /tmp/sagemaker.tar.gz --force-reinstall

! aws s3 cp s3://reinvent-rc-wheels/2022/boto3/awscli.tar.gz /tmp/ --region us-west-2
! pip install /tmp/awscli.tar.gz --force-reinstall

! aws s3 cp s3://reinvent-rc-wheels/2022/boto3/boto3.tar.gz /tmp/ --region us-west-2
! pip install /tmp/boto3.tar.gz --force-reinstall

! aws s3 cp s3://reinvent-rc-wheels/2022/boto3/botocore.tar.gz /tmp/ --region us-west-2
! pip install /tmp/botocore.tar.gz --force-reinstall
[ ]:
import boto3
import os
import pandas as pd
import sagemaker
import time
from datetime import datetime
from sagemaker import ModelPackage
from sagemaker.image_uris import retrieve
from sagemaker.lambda_helper import Lambda
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.workflow.callback_step import CallbackStep
from sagemaker.workflow.lambda_step import LambdaStep
from sagemaker.workflow.parameters import ParameterInteger, ParameterString
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import ProcessingStep, CacheConfig

Initialization

[ ]:
boto_session = boto3.session.Session()
aws_region = boto_session.region_name
sagemaker_client = boto_session.client("sagemaker")
lambda_client = boto_session.client("lambda")
sagemaker_session = sagemaker.session.Session(
    boto_session=boto_session, sagemaker_client=sagemaker_client
)
sqs_client = boto_session.client(
    "sqs",
    region_name=aws_region,
    endpoint_url=f"https://sqs.{aws_region}.amazonaws.com",
)
DATASET_PATH = os.path.join("data", "diabetic", "data", "diabetic_transformed.csv")
BUCKET_NAME = sagemaker_session.default_bucket()
PROCESSING_JOB_LOCAL_BASE_PATH = "/opt/ml/processing"

IAM permissions

For demo purposes, this notebook simplifies the IAM permissions configuration when creating required IAM roles that can be assumed by the SageMaker and Lambda services. The following managed policies are sufficient to run this notebook but should be further scoped down to improve security (least privilege principle). - Lambda Execution Role: - AmazonSageMakerFullAccess - AmazonSQSFullAccess - SageMaker Execution Role: - AmazonSageMakerFullAccess - AWSLambda_FullAccess - AmazonSQSFullAccess

[ ]:
# TODO: need to replace the lambda execution role name by its actual value
lambda_execution_role_name = ""
aws_account_id = boto3.client("sts").get_caller_identity().get("Account")
LAMBDA_EXECUTION_ROLE_ARN = f"arn:aws:iam::{aws_account_id}:role/{lambda_execution_role_name}"  # to be assumed by the Lambda service
SAGEMAKER_EXECUTION_ROLE_ARN = (
    sagemaker.get_execution_role()
)  # to be assumed by the SageMaker service

SageMaker Pipelines parameters

[ ]:
cache_config = CacheConfig(enable_caching=False)
autopilot_job_name = ParameterString(
    name="AutopilotJobName",
    default_value="autopilot-" + datetime.now().strftime("%Y-%m-%d-%H-%M-%S"),
)
model_package_name = ParameterString(
    name="ModelPackageName",
    default_value=autopilot_job_name.default_value + "-model-package",
)
target_attribute_name = ParameterString(name="TargetAttributeName", default_value="readmitted")
train_val_dataset_s3_path = ParameterString(
    name="TrainValDatasetS3Path",
    default_value=sagemaker.s3.s3_path_join(
        "s3://", BUCKET_NAME, autopilot_job_name.default_value, "data", "train_val.csv"
    ),
)
x_test_s3_path = ParameterString(
    name="XTestDatasetS3Path",
    default_value=sagemaker.s3.s3_path_join(
        "s3://", BUCKET_NAME, autopilot_job_name.default_value, "data", "x_test.csv"
    ),
)
y_test_s3_path = ParameterString(
    name="YTestDatasetS3Path",
    default_value=sagemaker.s3.s3_path_join(
        "s3://", BUCKET_NAME, autopilot_job_name.default_value, "data", "y_test.csv"
    ),
)
max_autopilot_candidates = ParameterInteger(name="MaxAutopilotCandidates", default_value=16)
max_autopilot_job_runtime = ParameterInteger(
    name="MaxAutoMLJobRuntimeInSeconds", default_value=7200  # 2 hours
)
max_autopilot_training_job_runtime = ParameterInteger(
    name="MaxRuntimePerTrainingJobInSeconds", default_value=3600
)  # 1 hour
instance_count = ParameterInteger(name="InstanceCount", default_value=1)
instance_type = ParameterString(name="InstanceType", default_value="ml.m5.xlarge")
model_approval_status = ParameterString(name="ModelApprovalStatus", default_value="Approved")
batch_transform_output_s3_path = ParameterString(
    name="BatchTransformOutputS3Path",
    default_value=sagemaker.s3.s3_path_join(
        "s3://", BUCKET_NAME, autopilot_job_name.default_value, "batch-transform-output"
    ),
)
training_output_s3_path = ParameterString(
    name="TrainingOutputS3Path",
    default_value=sagemaker.s3.s3_path_join(
        "s3://", BUCKET_NAME, autopilot_job_name.default_value, "training-output"
    ),
)

Get dataset

We use a publicly available hospital readmission dataset - diabetic patients’ dataset to predict re-admission of diabetic patients within 30 days post discharge. This is a multi-class classification problem since the readmission options are either “< 30” if the patient is readmitted within 30 days, “> 30” if the patient is readmitted after 30 days or “no” for no record of readmission.

[ ]:
!mkdir data
!wget https://static.us-east-1.prod.workshops.aws/public/d56bf7ad-9738-4edf-9be0-f03cd22d8cf2/static/resources/hcls/diabetic.zip -nc -O data/data.zip
!unzip -o data/data.zip -d data
[ ]:
data = pd.read_csv(DATASET_PATH)
train_val_data = data.sample(frac=0.8)
test_data = data.drop(train_val_data.index)
train_val_data.to_csv(train_val_dataset_s3_path.default_value, index=False, header=True)
test_data.drop(target_attribute_name.default_value, axis=1).to_csv(
    x_test_s3_path.default_value, index=False, header=False
)
test_data[target_attribute_name.default_value].to_csv(
    y_test_s3_path.default_value, index=False, header=True
)

First pipeline step: start Autopilot job

This pipeline step uses a Lambda step which runs a serverless Lambda function we create. The Lambda function in the start_autopilot_job.py script creates a SageMaker Autopilot job.

[ ]:
lambda_start_autopilot_job = Lambda(
    function_name="StartSagemakerAutopilotJob",
    execution_role_arn=LAMBDA_EXECUTION_ROLE_ARN,
    script="start_autopilot_job.py",
    handler="start_autopilot_job.lambda_handler",
    session=sagemaker_session,
)
lambda_start_autopilot_job.upsert()
step_start_autopilot_job = LambdaStep(
    name="StartAutopilotJobStep",
    lambda_func=lambda_start_autopilot_job,
    inputs={
        "TrainValDatasetS3Path": train_val_dataset_s3_path.default_value,
        "MaxCandidates": max_autopilot_candidates.default_value,
        "MaxRuntimePerTrainingJobInSeconds": max_autopilot_training_job_runtime.default_value,
        "MaxAutoMLJobRuntimeInSeconds": max_autopilot_job_runtime.default_value,
        "TargetAttributeName": target_attribute_name.default_value,
        "TrainingOutputS3Path": training_output_s3_path.default_value,
        "AutopilotJobName": autopilot_job_name,
        "ProblemType": "MulticlassClassification",
        "AutopilotExecutionRoleArn": SAGEMAKER_EXECUTION_ROLE_ARN,
        "AutopilotObjectiveMetricName": "F1macro",
        "AutopilotMode": "ENSEMBLING",
    },
    cache_config=cache_config,
)

Second pipeline step: check Autopilot job status

The step repeatedly keeps track of the training job status by leveraging a separate Lambda function in check_autopilot_job_status.py until the Autopilot training job’s completion.

[ ]:
lambda_check_autopilot_job_status_function_name = "CheckSagemakerAutopilotJobStatus"
lambda_check_autopilot_job_status = Lambda(
    function_name=lambda_check_autopilot_job_status_function_name,
    execution_role_arn=LAMBDA_EXECUTION_ROLE_ARN,
    script="check_autopilot_job_status.py",
    handler="check_autopilot_job_status.lambda_handler",
    session=sagemaker_session,
    timeout=15,
)
lambda_check_autopilot_job_status.upsert()
queue_url = sqs_client.create_queue(
    QueueName="AutopilotSagemakerPipelinesSqsCallback",
    Attributes={"DelaySeconds": "5", "VisibilityTimeout": "300"},
)[
    "QueueUrl"
]  # 5 minutes timeout
# Add event source mapping
try:
    response = lambda_client.create_event_source_mapping(
        EventSourceArn=sqs_client.get_queue_attributes(
            QueueUrl=queue_url, AttributeNames=["QueueArn"]
        )["Attributes"]["QueueArn"],
        FunctionName=lambda_check_autopilot_job_status_function_name,
        Enabled=True,
        BatchSize=1,
    )
except lambda_client.exceptions.ResourceConflictException:
    pass
step_check_autopilot_job_status_callback = CallbackStep(
    name="CheckAutopilotJobStatusCallbackStep",
    sqs_queue_url=queue_url,
    inputs={"AutopilotJobName": autopilot_job_name},
    outputs=[],
    depends_on=[step_start_autopilot_job],
)

Third pipeline step: evaluate Autopilot model

The SageMaker Processing step launches a SageMaker Batch Transform Job to evaluate the trained SageMaker Autopilot model against an evaluation dataset and generates the performance metrics evaluation report and model explainability metrics.

[ ]:
processing_evaluation = SKLearnProcessor(
    role=SAGEMAKER_EXECUTION_ROLE_ARN,
    framework_version="1.0-1",
    instance_count=instance_count.default_value,
    instance_type=instance_type.default_value,
    sagemaker_session=sagemaker_session,
)
step_autopilot_model_evaluation = ProcessingStep(
    name="EvaluateBestAutopilotModelStep",
    job_arguments=[
        "--autopilot-job-name",
        autopilot_job_name,
        "--aws-region",
        aws_region,
        "--batch-transform-output-s3-path",
        batch_transform_output_s3_path.default_value,
        "--instance-type",
        instance_type.default_value,
        "--instance-count",
        str(instance_count.default_value),
        "--local-base-path",
        PROCESSING_JOB_LOCAL_BASE_PATH,
        "--sagemaker-execution-role-arn",
        SAGEMAKER_EXECUTION_ROLE_ARN,
        "--x-test-s3-path",
        x_test_s3_path.default_value,
        "--y-test-file-name",
        y_test_s3_path.default_value.split("/")[-1],
    ],
    processor=processing_evaluation,
    code="evaluate_autopilot_model.py",
    depends_on=[step_check_autopilot_job_status_callback],
    inputs=[
        ProcessingInput(
            input_name="LabelsTestDataset",
            source=y_test_s3_path.default_value,
            destination=os.path.join(PROCESSING_JOB_LOCAL_BASE_PATH, "data"),
        ),
    ],
    outputs=[
        ProcessingOutput(
            output_name="EvaluationReport",
            source=os.path.join(PROCESSING_JOB_LOCAL_BASE_PATH, "evaluation_report"),
        )
    ],
    cache_config=cache_config,
)

Fourth pipeline step: register Autopilot model

Using a Lambda step, the Lambda function in register_autopilot_job.py registers the SageMaker Autopilot model to the SageMaker Model Registry using the evaluation report obtained in the previous SageMaker Processing step.

[ ]:
lambda_register_autopilot_model = Lambda(
    function_name="RegisterSagemakerAutopilotModel",
    execution_role_arn=LAMBDA_EXECUTION_ROLE_ARN,
    script="register_autopilot_model.py",
    handler="register_autopilot_model.lambda_handler",
    session=sagemaker_session,
    timeout=15,
)
lambda_register_autopilot_model.upsert()
step_register_autopilot_model = LambdaStep(
    name="RegisterAutopilotModelStep",
    lambda_func=lambda_register_autopilot_model,
    inputs={
        "AutopilotJobName": autopilot_job_name,
        "EvaluationReportS3Path": step_autopilot_model_evaluation.properties.ProcessingOutputConfig.Outputs[
            "EvaluationReport"
        ].S3Output.S3Uri,
        "ModelPackageName": model_package_name.default_value,
        "ModelApprovalStatus": model_approval_status.default_value,
        "InstanceType": instance_type.default_value,
    },
    cache_config=cache_config,
)

Create and run pipeline

Once the pipeline steps are defined, we combine them into a SageMaker Pipeline. The steps are run in sequential order. The pipeline executes all of the steps for an AutoML job leveraging SageMaker Autopilot and SageMaker Pipelines for training, model evaluation and model registration.

[ ]:
pipeline = Pipeline(
    name="autopilot-demo-pipeline",
    parameters=[
        autopilot_job_name,
        target_attribute_name,
        train_val_dataset_s3_path,
        x_test_s3_path,
        y_test_s3_path,
        max_autopilot_candidates,
        max_autopilot_job_runtime,
        max_autopilot_training_job_runtime,
        instance_count,
        instance_type,
        model_approval_status,
    ],
    steps=[
        step_start_autopilot_job,
        step_check_autopilot_job_status_callback,
        step_autopilot_model_evaluation,
        step_register_autopilot_model,
    ],
    sagemaker_session=sagemaker_session,
)
pipeline.upsert(role_arn=SAGEMAKER_EXECUTION_ROLE_ARN)
pipeline_execution = pipeline.start()
pipeline_execution.wait(delay=20, max_attempts=24 * 60 * 3)  # max wait: 24 hours

Deploy the Autopilot Model Endpoint

Deploying the previously registered best Autopilot model from the ML training pipeline to a SageMaker Endpoint.

[ ]:
model = ModelPackage(
    role=SAGEMAKER_EXECUTION_ROLE_ARN,
    model_package_arn=model_package_name.default_value,
    sagemaker_session=sagemaker_session,
)
while (
    sagemaker_client.describe_model_package(ModelPackageName=model_package_name.default_value)[
        "ModelPackageStatus"
    ]
    != "Completed"
):
    time.sleep(10)
model.deploy(
    initial_instance_count=instance_count.default_value,
    instance_type=instance_type.default_value,
)

Cleanup

[ ]:
sagemaker_client.delete_endpoint(EndpointName=model.endpoint_name)

Notebook CI Test Results

This notebook was tested in multiple regions. The test results are as follows, except for us-west-2 which is shown at the top of the notebook.

This us-east-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This us-east-2 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This us-west-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This ca-central-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This sa-east-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This eu-west-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This eu-west-2 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This eu-west-3 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This eu-central-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This eu-north-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This ap-southeast-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This ap-southeast-2 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This ap-northeast-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This ap-northeast-2 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This ap-south-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable