Parameterize SageMaker Pipelines


This notebook’s CI test result for us-west-2 is as follows. CI test results in other regions can be found at the end of the notebook.

This us-west-2 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable


Customers can use SageMaker Pipelines to build scalable machine learning pipelines that preprocess data and train machine learning models. With SageMaker Pipelines, customers have a toolkit for every part of the machine learning lifecycle that provides deep customizations and tuning options to fit every organization. Customers have the freedom to customize SageMaker Pipelines to specific use cases, but also to create generic machine learning pipelines that can be reused across different use cases.

From a birds-eye view a machine learning pipeline usually consists of 3 general steps: a preprocess step where the data is transformed, a training step where a machine learning model is trained, and an evaluation step which tests the performance of the trained model. If the model is performing according to the objective metric you’re optimizing for, then that becomes a candidate model for deployment to one or more environments. These candidate models should be registered into SageMaker Model Registry to catalog and store key metadata for that model version.

SageMaker Pipelines

These steps have a lot of commonalities, even across different machine learning use cases. Customers that want to create training pipelines that can be re-used in an organization can use SageMaker Pipelines to create parameterized, generic training pipelines. Parameters allow customers to identify specific parameters that can be passed into the pipeline during pipeline execution without having to directly change the pipeline code itself.

This notebook demonstrates how SageMaker Pipelines can be used to create a generic binary classification machine learning pipeline using XGBoost that’s reusable across teams, machine learning use cases and even customers in a SaaS system.

SageMaker Pipelines

Amazon SageMaker Pipelines is a purpose-built, easy-to-use CI/CD service for machine learning. With SageMaker Pipelines, customers can create machine learning workflows with an easy-to-use Python SDK, and then visualize and manage workflows using Amazon SageMaker Studio.

SageMaker Pipeline steps and parameters

SageMaker pipelines works on the concept of steps. The order steps are executed in is inferred from the dependencies each step has. If a step has a dependency on the output from a previous step, it’s not executed until after that step has completed successfully.

SageMaker Pipeline Parameters are input parameters specified when triggering a pipeline execution. They need to be explicitly defined when creating the pipeline and contain default values.

To know more about the type of steps and parameters supported, check out the SageMaker Pipelines Overview.

SageMaker Pipeline DAG

When creating a SageMaker Pipeline, SageMaker creates a Direct Acyclic Graph, DAG, that customers can visualize in Amazon SageMaker Studio. The DAG can be used to track pipeline executions, outputs and metrics. In this notebook, a SageMaker Pipeline with the following DAG is created:

SageMaker Pipeline Direct Acyclic Graph

Predict customer churn and credit risk with XGBoost

Data

This notebook uses 2 datasets to demonstrate pipeline portability: 1. A synthetic customer churn dataset. 2. The Statlog German credit data from UCI’s ML Repository.

Overview

Disclaimer This notebook was created using Amazon SageMaker Studio and the Python3(DataScience) kernel. SageMaker Studio is required for the visualizations of the DAG and model metrics to work.

The purpose of this notebook is to demonstrate how SageMaker Pipelines can be used to create a generic XGBoost training pipeline that preprocesses, trains, tunes, evaluates and registers new machine learning models with the SageMaker model registry, that is reusable across teams, customers and use cases. All scripts to preprocess the data and evaluate the trained model have been prepared in advance and are available here: - credit/preprocess.py - customer_churn/preprocess.py - evaluate.py

[ ]:
!pip install -U sagemaker==2.72.1 --quiet # Ensure correct version of SageMaker is installed
[ ]:
import boto3
import sagemaker
import sagemaker.session
[ ]:
session = sagemaker.session.Session()
region = session.boto_region_name
role = sagemaker.get_execution_role()
bucket = session.default_bucket()
[ ]:
prefix = "paramaterized"  # Prefix to S3 artifacts
pipeline_name = "DEMO-parameterized-pipeline"  # SageMaker Pipeline name
credit_model_group = "DEMO-credit-registry"
churn_model_group = "DEMO-churn-registry"

Download data

Start with downloading all data sets

[ ]:
s3 = boto3.client("s3")
s3.download_file(
    f"sagemaker-example-files-prod-{region}",
    "datasets/tabular/uci_statlog_german_credit_data/german_credit_data.csv",
    "credit_risk/german_credit_data.csv",
)
s3.download_file(
    f"sagemaker-example-files-prod-{region}",
    "datasets/tabular/synthetic/churn.csv",
    "customer_churn/churn-dataset.csv",
)

Upload data

Upload all data sets and scripts to S3.

[ ]:
# Upload the raw datasets and scripts to S3
customer_churn_data_uri = session.upload_data(
    path="customer_churn/churn-dataset.csv", key_prefix=prefix + "/data"
)

credit_data_uri = session.upload_data(
    path="credit_risk/german_credit_data.csv", key_prefix=prefix + "/data"
)

churn_preprocess_uri = session.upload_data(
    path="customer_churn/preprocess.py", key_prefix=prefix + "/preprocess/churn"
)

credit_preprocess_uri = session.upload_data(
    path="credit_risk/preprocess.py", key_prefix=prefix + "/preprocess/credit"
)

evaluate_script_uri = session.upload_data(path="evaluate.py", key_prefix=prefix + "/evaluate")


print("Customer churn data set uploaded to ", customer_churn_data_uri)
print("Credit data set uploaded to ", credit_data_uri)

print("Customer churn preprocessing script uploaded to ", churn_preprocess_uri)
print("Credit preprocessing script uploaded to ", credit_preprocess_uri)

print("Evaluation script uploaded to ", evaluate_script_uri)

Pipeline input parameters

Pipeline Parameters are input parameter when triggering a pipeline execution. They need to be explicitly defined when creating the pipeline and contain default values.

Create parameters for the inputs to the pipeline. In this case, parameters will be used for: - ModelGroup - Which registry to register the trained model with. - InputData - S3 URI to pipeline input data. - PreprocessScript - S3 URI to python script to preprocess the data. - EvaluateScript - S3 URI to python script to evaluate the trained model. - MaxiumTrainingJobs - How many training jobs to allow when hyperparameter tuning the model - MaxiumParallelTrainingJobs - How many training jobs to allow in parallel when hyperparameter tuning the model. - AccuracyConditionThreshold - Only register models with the model registry if the have at least this classification accuracy. - ProcessingInstanceType - What EC2 instance type to use for processing. - TrainingInstanceType - What EC2 instance type to use for training.

[ ]:
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat,
)

# To what Registry to register the model and its versions.
model_registry_package = ParameterString(name="ModelGroup", default_value="default-registry")

# S3 URI to input data
input_data = ParameterString(name="InputData", default_value="s3://{}/uri/data.csv".format(bucket))

# S3 URI to preprocessing script
preprocess_script = ParameterString(
    name="PreprocessScript", default_value="s3://{}/uri/preprocess.py".format(bucket)
)

# S3 URI to evaluation script
evaluate_script = ParameterString(
    name="EvaluateScript", default_value="s3://{}/uri/evaluate.py".format(bucket)
)

# Maximum amount of training jobs to allow in the HP tuning
max_training_jobs = ParameterInteger(name="MaxiumTrainingJobs", default_value=1)

# Maximum amount of trainingjobs to allow in the HP tuning
max_parallel_training_jobs = ParameterInteger(name="MaxiumParallelTrainingJobs", default_value=1)

# Accuracy threshold to decide whether or not to register the model with Model Registry
accuracy_condition_threshold = ParameterFloat(name="AccuracyConditionThreshold", default_value=0.7)

# What instance type to use for processing.
processing_instance_type = ParameterString(
    name="ProcessingInstanceType", default_value="ml.m5.large"
)

# What instance type to use for training.
training_instance_type = ParameterString(name="TrainingInstanceType", default_value="ml.m5.xlarge")

In the first step an sklearn processor is created, used in the ProcessingStep.

[ ]:
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.workflow.functions import Join
from sagemaker.workflow.execution_variables import ExecutionVariables

# Create SKlearn processor object,
# The object contains information about what instance type to use, the IAM role to use etc.
# A managed processor comes with a preconfigured container, so only specifying version is required.
sklearn_processor = SKLearnProcessor(
    framework_version="0.23-1", role=role, instance_type=processing_instance_type, instance_count=1
)

# Use the sklearn_processor in a SageMaker Pipelines ProcessingStep
step_preprocess_data = ProcessingStep(
    name="Preprocess-Data",
    processor=sklearn_processor,
    inputs=[
        ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),
    ],
    outputs=[
        ProcessingOutput(
            output_name="train",
            source="/opt/ml/processing/train",
            destination=Join(
                on="/",
                values=[
                    "s3://{}".format(bucket),
                    prefix,
                    ExecutionVariables.PIPELINE_EXECUTION_ID,
                    "train",
                ],
            ),
        ),
        ProcessingOutput(
            output_name="validation",
            source="/opt/ml/processing/validation",
            destination=Join(
                on="/",
                values=[
                    "s3://{}".format(bucket),
                    prefix,
                    ExecutionVariables.PIPELINE_EXECUTION_ID,
                    "validation",
                ],
            ),
        ),
        ProcessingOutput(
            output_name="test",
            source="/opt/ml/processing/test",
            destination=Join(
                on="/",
                values=[
                    "s3://{}".format(bucket),
                    prefix,
                    ExecutionVariables.PIPELINE_EXECUTION_ID,
                    "test",
                ],
            ),
        ),
    ],
    code=preprocess_script,
)

In the second step, the train and validation output from the previous processing step are used to train a model. The XGBoost container is retrieved and then an XGBoost estimator is created, on which hyperparameters are specified before the training step is created.

[ ]:
from sagemaker.inputs import TrainingInput
from sagemaker.estimator import Estimator
from sagemaker.tuner import HyperparameterTuner, ContinuousParameter, IntegerParameter
from sagemaker.workflow.steps import TuningStep

# Fetch container to use for training
image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.2-2",
    py_version="py3",
    instance_type="ml.m5.xlarge",
)

# Create XGBoost estimator object
# The object contains information about what container to use, what instance type etc.
xgb_estimator = Estimator(
    image_uri=image_uri,
    instance_type=training_instance_type,
    instance_count=1,
    role=role,
    disable_profiler=True,
)

# Create Hyperparameter tuner object. Ranges from https://docs.aws.amazon.com/sagemaker/latest/dg/xgboost-tuning.html
xgb_tuner = HyperparameterTuner(
    estimator=xgb_estimator,
    objective_metric_name="validation:auc",
    hyperparameter_ranges={
        "eta": ContinuousParameter(0, 0.5),
        "alpha": ContinuousParameter(0, 1000),
        "min_child_weight": ContinuousParameter(1, 120),
        "max_depth": IntegerParameter(1, 10),
        "num_round": IntegerParameter(1, 2000),
        "subsample": ContinuousParameter(0.5, 1),
    },
    max_jobs=max_training_jobs,
    max_parallel_jobs=max_parallel_training_jobs,
)

# use the tuner in a SageMaker pipielines tuning step.
step_tuning = TuningStep(
    name="Train-And-Tune-Model",
    tuner=xgb_tuner,
    inputs={
        "train": TrainingInput(
            s3_data=step_preprocess_data.properties.ProcessingOutputConfig.Outputs[
                "train"
            ].S3Output.S3Uri,
            content_type="text/csv",
        ),
        "validation": TrainingInput(
            s3_data=step_preprocess_data.properties.ProcessingOutputConfig.Outputs[
                "validation"
            ].S3Output.S3Uri,
            content_type="text/csv",
        ),
    },
)

When a model is trained, it’s common to evaluate the model on unseen data before registering it with the model registry. This ensures the model registry isn’t cluttered with poorly performing model versions.

[ ]:
from sagemaker.processing import ScriptProcessor
from sagemaker.workflow.properties import PropertyFile

# Create ScriptProcessor object.
# The object contains information about what container to use, what instance type etc.
evaluate_model_processor = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type=processing_instance_type,
    instance_count=1,
    role=role,
)

# Create a PropertyFile
# A PropertyFile is used to be able to reference outputs from a processing step, for instance to use in a condition step.
# For more information, visit https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-propertyfile.html
evaluation_report = PropertyFile(
    name="EvaluationReport", output_name="evaluation", path="evaluation.json"
)

# Use the evaluate_model_processor in a SageMaker Pipelines ProcessingStep.
# Extract the best model for evaluation.
step_evaluate_model = ProcessingStep(
    name="Evaluate-Model",
    processor=evaluate_model_processor,
    inputs=[
        ProcessingInput(
            source=step_tuning.get_top_model_s3_uri(top_k=0, s3_bucket=bucket),
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            source=step_preprocess_data.properties.ProcessingOutputConfig.Outputs[
                "test"
            ].S3Output.S3Uri,
            destination="/opt/ml/processing/test",
        ),
    ],
    outputs=[
        ProcessingOutput(
            output_name="evaluation",
            source="/opt/ml/processing/evaluation",
            destination=Join(
                on="/",
                values=[
                    "s3://{}".format(bucket),
                    prefix,
                    ExecutionVariables.PIPELINE_EXECUTION_ID,
                    "evaluation-report",
                ],
            ),
        ),
    ],
    code=evaluate_script,
    property_files=[evaluation_report],
)

If the trained model meets the model performance requirements, a new model version is registered with the model registry for further analysis. To attach model metrics to the model version, create a ModelMetrics object using the evaluation report created in the evaluation step. Then, create the RegisterModel step.

[ ]:
from sagemaker.model_metrics import MetricsSource, ModelMetrics
from sagemaker.workflow.step_collections import RegisterModel

# Create ModelMetrics object using the evaluation report from the evaluation step
# A ModelMetrics object contains metrics captured from a model.
model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri=Join(
            on="/",
            values=[
                step_evaluate_model.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"][
                    "S3Uri"
                ],
                "evaluation.json",
            ],
        ),
        content_type="application/json",
    )
)

# Crete a RegisterModel step, which registers the model with SageMaker Model Registry.
step_register_model = RegisterModel(
    name="Register-Model",
    estimator=xgb_estimator,
    model_data=step_tuning.get_top_model_s3_uri(top_k=0, s3_bucket=bucket),
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.xlarge", "ml.m5.large"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=model_registry_package,
    model_metrics=model_metrics,
)

Adding conditions to the pipeline is done with a ConditionStep. In this case, we only want to register the new model version with the model registry if the new model meets an accuracy condition.

[ ]:
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet

# Create accuracy condition to ensure the model meets performance requirements.
# Models with a test accuracy lower than the condition will not be registered with the model registry.
cond_gte = ConditionGreaterThanOrEqualTo(
    left=JsonGet(
        step_name=step_evaluate_model.name,
        property_file=evaluation_report,
        json_path="binary_classification_metrics.accuracy.value",
    ),
    right=accuracy_condition_threshold,
)

# Create a SageMaker Pipelines ConditionStep, using the condition above.
# Enter the steps to perform if the condition returns True / False.
step_cond = ConditionStep(
    name="Accuracy-Condition",
    conditions=[cond_gte],
    if_steps=[step_register_model],
    else_steps=[],
)

Now that all pipeline steps are created, a pipeline is created.

[ ]:
from sagemaker.workflow.pipeline import Pipeline

# Create a SageMaker Pipeline.
# Each parameter for the pipeline must be set as a parameter explicitly when the pipeline is created.
# Also pass in each of the steps created above.
# Note that the order of execution is determined from each step's dependencies on other steps,
# not on the order they are passed in below.
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_type,
        training_instance_type,
        input_data,
        preprocess_script,
        evaluate_script,
        accuracy_condition_threshold,
        model_registry_package,
        max_parallel_training_jobs,
        max_training_jobs,
    ],
    steps=[step_preprocess_data, step_tuning, step_evaluate_model, step_cond],
)
[ ]:
# Submit pipeline
pipeline.upsert(role_arn=role)

Now that the pipeline is created, it can be started with custom parameters making the pipeline agnostic to who is triggering it, but also to the scripts and data used. The pipeline can be started using the CLI, the SageMaker Studio UI or the SDK and below there is a screenshot of what it looks like in the SageMaker Studio UI.

Trigger pipeline from Studio

Starting the pipeline with the SDK

In the examples below, the pipeline is triggered for two machine learning problems, each with different preprocessing scripts and model registry. Each machine learning problem is run with two different sets of parameters.

[ ]:
# Start pipeline with credit data and preprocessing script
pipeline.start(
    execution_display_name="Credit",
    parameters=dict(
        InputData=credit_data_uri,
        PreprocessScript=credit_preprocess_uri,
        EvaluateScript=evaluate_script_uri,
        AccuracyConditionThreshold=0.2,
        MaxiumParallelTrainingJobs=2,
        MaxiumTrainingJobs=5,
        ModelGroup=credit_model_group,
    ),
)
[ ]:
# Start pipeline with credit data and preprocessing script
pipeline.start(
    execution_display_name="Credit",
    parameters=dict(
        InputData=credit_data_uri,
        PreprocessScript=credit_preprocess_uri,
        EvaluateScript=evaluate_script_uri,
        AccuracyConditionThreshold=0.7,
        MaxiumParallelTrainingJobs=3,
        MaxiumTrainingJobs=42,
        ModelGroup=credit_model_group,
    ),
)
[ ]:
# Start pipeline with customer churn data and preprocessing script
pipeline.start(
    execution_display_name="Churn",
    parameters=dict(
        InputData=customer_churn_data_uri,
        PreprocessScript=churn_preprocess_uri,
        EvaluateScript=evaluate_script_uri,
        AccuracyConditionThreshold=0.4,
        MaxiumParallelTrainingJobs=1,
        MaxiumTrainingJobs=2,
        ModelGroup=churn_model_group,
    ),
)
[ ]:
# Start pipeline with customer churn data and preprocessing script
pipeline.start(
    execution_display_name="Churn",
    parameters=dict(
        InputData=customer_churn_data_uri,
        PreprocessScript=churn_preprocess_uri,
        EvaluateScript=evaluate_script_uri,
        AccuracyConditionThreshold=0.8,
        MaxiumParallelTrainingJobs=4,
        MaxiumTrainingJobs=40,
        ModelGroup=churn_model_group,
    ),
)

Visualize model performance metrics

Once the pipelines have completed successfully, metrics attached to the model version can be visualized. In SageMaker Studio, choose SageMaker Components and registries in the left pane and under Model registry, select one of the model package that was created. Select both versions and right-click. Choose Compare model versions.

The screenshot below shows what comparing the customer churn model versions looks like. Note that the standard deviation shows as NaN since it is not relevant to this model’s calculated metrics.

Compare churn model versions

The screenshot below shows what comparing the credit risk model versions looks like.

Compare credit risk versions

Clean up (optional)

Delete the model registries and the pipeline to keep the Studio environment tidy.

[ ]:
def delete_model_package_group(sm_client, package_group_name):
    try:
        model_versions = sm_client.list_model_packages(ModelPackageGroupName=package_group_name)

    except Exception as e:
        print("{} \n".format(e))
        return

    for model_version in model_versions["ModelPackageSummaryList"]:
        try:
            sm_client.delete_model_package(ModelPackageName=model_version["ModelPackageArn"])
        except Exception as e:
            print("{} \n".format(e))
        time.sleep(0.5)  # Ensure requests aren't throttled

    try:
        sm_client.delete_model_package_group(ModelPackageGroupName=package_group_name)
        print("{} model package group deleted".format(package_group_name))
    except Exception as e:
        print("{} \n".format(e))
    return


def delete_sagemaker_pipeline(sm_client, pipeline_name):
    try:
        sm_client.delete_pipeline(
            PipelineName=pipeline_name,
        )
        print("{} pipeline deleted".format(pipeline_name))
    except Exception as e:
        print("{} \n".format(e))
        return
[ ]:
import boto3
import time

client = boto3.client("sagemaker")

registries = [credit_model_group, churn_model_group]

for registry in registries:
    delete_model_package_group(client, registry)

delete_sagemaker_pipeline(client, pipeline_name)

Notebook CI Test Results

This notebook was tested in multiple regions. The test results are as follows, except for us-west-2 which is shown at the top of the notebook.

This us-east-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable

This us-east-2 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable

This us-west-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable

This ca-central-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable

This sa-east-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable

This eu-west-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable

This eu-west-2 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable

This eu-west-3 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable

This eu-central-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable

This eu-north-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable

This ap-southeast-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable

This ap-southeast-2 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable

This ap-northeast-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable

This ap-northeast-2 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable

This ap-south-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable