Basic Pipeline for Batch Inference using Low-code Experience for SageMaker Pipelines


This notebook’s CI test result for us-west-2 is as follows. CI test results in other regions can be found at the end of the notebook.

This us-west-2 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable


This notebook shows the example of orchestrating jobs for model building and batch inference using low-code experience for SageMaker Pipelines, utilizing @step decorator. We build an automated model building pipeline for a classification problem predicting if a breast cancer diagnostic data indicates benign or malignant tumor. The model building pipeline includes preprocessing step, training step, evaluation step, and register model step. We also create another pipeline to perform batch inference.

Dataset

We use breast cancer Wisconsin diagnostic dataset: > > Wolberg,William, Mangasarian,Olvi, Street,Nick, and Street,W.. (1995). Breast Cancer Wisconsin (Diagnostic). UCI Machine Learning Repository. https://doi.org/10.24432/C5DW2B.

The dataset can be downloaded from UCI Machine Learning Repository: https://archive.ics.uci.edu/dataset/17/breast+cancer+wisconsin+diagnostic

Notebook Preparation

In this section we install and prepare the library dependencies we need to use in this notebook, as well as initiate our sagemaker session.

[ ]:
%pip install -U boto3
[ ]:
import os
import boto3
[ ]:
%pip install -r ./requirements.txt

We can use configuration file config.yaml to set default values of the infrastructure such as instance type, and dependencies to run the pipeline. We use environment variable “SAGEMAKER_USER_CONFIG_OVERRIDE” to set the path to configuration file.

[ ]:
# Set path to config file
os.environ["SAGEMAKER_USER_CONFIG_OVERRIDE"] = os.getcwd()
[ ]:
import sagemaker
from sagemaker.workflow.function_step import step
from sagemaker.workflow.parameters import ParameterString

sagemaker_session = sagemaker.session.Session()
role = sagemaker.get_execution_role()
bucket = sagemaker_session.default_bucket()
region = sagemaker_session.boto_region_name

Define variables and pipeline parameters

[ ]:
# Location of our dataset
input_path = f"s3://sagemaker-example-files-prod-{region}/datasets/tabular/breast_cancer/wdbc.csv"
[ ]:
pipeline_name = "lowcode-breast-cancer-xgb"
model_package_group_name = "lowcode-breast-cancer-xgb"
[ ]:
instance_type = ParameterString(name="TrainingInstanceType", default_value="ml.m5.xlarge")
model_approval_status = ParameterString(
    name="ModelApprovalStatus", default_value="PendingManualApproval"
)

Preprocessing Step

The breast cancer Wisconsin dataset contains column id which we do not use for training. The second column diagnosis is class label, and the label is represented using ‘M’ for Malignant class, and ‘B’ for Benign class.

In the preprocessing step, we drop the column id, then split the dataset into three distinct sets: train, validation, and test set.

Note that keep_alive_period_in_seconds parameter in @step decorator indicates how many seconds we want to keep the instance alive, waiting to be reused for the next pipeline step execution. Setting this parameter speeds up the pipeline execution because we reduce the launching of new instances to execute pipeline steps.

[ ]:
random_state = 2023
label_column = "diagnosis"

feature_names = [
    "id",
    "diagnosis",
    "radius_mean",
    "texture_mean",
    "perimeter_mean",
    "area_mean",
    "smoothness_mean",
    "compactness_mean",
    "concavity_mean",
    "concave points_mean",
    "symmetry_mean",
    "fractal_dimension_mean",
    "radius_se",
    "texture_se",
    "perimeter_se",
    "area_se",
    "smoothness_se",
    "compactness_se",
    "concavity_se",
    "concave points_se",
    "symmetry_se",
    "fractal_dimension_se",
    "radius_worst",
    "texture_worst",
    "perimeter_worst",
    "area_worst",
    "smoothness_worst",
    "compactness_worst",
    "concavity_worst",
    "concave points_worst",
    "symmetry_worst",
    "fractal_dimension_worst",
]


@step(
    name="data-preprocessing",
    instance_type=instance_type,
    keep_alive_period_in_seconds=300,
)
def preprocess(raw_data_s3_path: str, output_prefix: str) -> tuple:
    import pandas as pd
    from sklearn.model_selection import train_test_split

    df = pd.read_csv(raw_data_s3_path, header=None, names=feature_names)
    df.drop(columns="id", inplace=True)

    train_df, test_df = train_test_split(df, test_size=0.2, stratify=df[label_column])
    validation_df, test_df = train_test_split(
        test_df, test_size=0.5, stratify=test_df[label_column]
    )
    train_df.reset_index(inplace=True, drop=True)
    validation_df.reset_index(inplace=True, drop=True)
    test_df.reset_index(inplace=True, drop=True)

    train_s3_path = f"s3://{bucket}/{output_prefix}/train.csv"
    val_s3_path = f"s3://{bucket}/{output_prefix}/val.csv"
    test_s3_path = f"s3://{bucket}/{output_prefix}/test.csv"

    train_df.to_csv(train_s3_path, index=False)
    validation_df.to_csv(val_s3_path, index=False)
    test_df.to_csv(test_s3_path, index=False)

    return train_s3_path, val_s3_path, test_s3_path

Training Step

We train an XGBoost model in this training step, using @step-decorated function with the S3 path of training and validation set, along with XGBoost hyperparameters. The S3 paths for both training and validation set is coming from the output of the previous step.

[ ]:
use_gpu = False
param = dict(
    objective="binary:logistic",
    max_depth=5,
    eta=0.2,
    gamma=4,
    min_child_weight=6,
    subsample=0.7,
    tree_method="gpu_hist" if use_gpu else "hist",  # Use GPU accelerated algorithm
)
num_round = 50


@step(
    name="model-training",
    instance_type=instance_type,
    keep_alive_period_in_seconds=300,
)
def train(
    train_s3_path: str,
    validation_s3_path: str,
    param: dict = param,
    num_round: int = num_round,
):
    import pandas as pd
    from xgboost import XGBClassifier

    # read data files from S3
    train_df = pd.read_csv(train_s3_path)
    validation_df = pd.read_csv(validation_s3_path)

    # create dataframe and label series
    y_train = (train_df.pop(label_column) == "M").astype("int")
    y_validation = (validation_df.pop(label_column) == "M").astype("int")

    xgb = XGBClassifier(n_estimators=num_round, **param)
    xgb.fit(
        train_df,
        y_train,
        eval_set=[(validation_df, y_validation)],
        early_stopping_rounds=5,
    )

    return xgb

Evaluation Step

In this step, we create a @step-decorated function evaluate the trained XGBoost model on the test dataset.

[ ]:
@step(
    name="model-evaluation",
    instance_type=instance_type,
    keep_alive_period_in_seconds=300,
)
def evaluate(model, test_s3_path: str) -> dict:
    import json
    import numpy as np
    import pandas as pd
    from sklearn.metrics import (
        accuracy_score,
        auc,
        confusion_matrix,
        f1_score,
        precision_score,
        recall_score,
        roc_curve,
    )

    test_df = pd.read_csv(test_s3_path)
    y_test = (test_df.pop(label_column) == "M").astype("int")

    prediction_probabilities = model.predict_proba(test_df)
    predictions = np.argmax(prediction_probabilities, axis=1)

    acc = accuracy_score(y_test, predictions)
    precision = precision_score(y_test, predictions, zero_division=1)
    recall = recall_score(y_test, predictions)
    f1 = f1_score(y_test, predictions)
    conf_matrix = confusion_matrix(y_test, predictions)
    fpr, tpr, _ = roc_curve(y_test, prediction_probabilities[:, 1])
    auc_value = auc(fpr, tpr)

    report_dict = {
        "binary_classification_metrics": {
            "accuracy": {"value": acc, "standard_deviation": "NaN"},
            "f1": {"value": f1, "standard_deviation": "NaN"},
            "precision": {"value": precision, "standard_deviation": "NaN"},
            "recall": {"value": recall, "standard_deviation": "NaN"},
            "confusion_matrix": {
                "0": {"0": int(conf_matrix[0][0]), "1": int(conf_matrix[0][1])},
                "1": {"0": int(conf_matrix[1][0]), "1": int(conf_matrix[1][1])},
            },
            "receiver_operating_characteristic_curve": {
                "false_positive_rates": list(fpr),
                "true_positive_rates": list(tpr),
            },
            "auc": {"value": auc_value, "standard_deviation": "NaN"},
        },
    }

    print(f"evaluation report: {json.dumps(report_dict, indent=2)}")
    return report_dict

Model Registration Step

In this step, we register the trained model to Model Registry. We use ModelBuilder to build model artifacts for inference.

[ ]:
@step(
    name="model-registration",
    instance_type=instance_type,
    keep_alive_period_in_seconds=300,
)
def register(model, evaluation, model_approval_status, sample_data):
    import json
    import numpy as np
    import pandas as pd
    import s3fs
    from pathlib import Path
    from sagemaker import MetricsSource, ModelMetrics
    from sagemaker.serve.builder.model_builder import ModelBuilder
    from sagemaker.serve.builder.schema_builder import SchemaBuilder
    from sagemaker.serve.spec.inference_spec import InferenceSpec
    from sagemaker.utils import unique_name_from_base
    from xgboost import XGBClassifier

    class XGBoostSpec(InferenceSpec):
        def load(self, model_dir: str):
            print(model_dir)
            model = XGBClassifier()
            model.load_model(model_dir + "/xgboost-model")
            return model

        def invoke(self, input_object: object, model: object):
            prediction_probabilities = model.predict_proba(input_object)
            predictions = np.argmax(prediction_probabilities, axis=1)
            return predictions

    # Upload evaluation report to s3
    eval_file_name = unique_name_from_base("evaluation")
    eval_report_s3_uri = (
        f"s3://{bucket}/{model_package_group_name}/evaluation-report/{eval_file_name}.json"
    )
    s3_fs = s3fs.S3FileSystem()
    eval_report_str = json.dumps(evaluation)
    with s3_fs.open(eval_report_s3_uri, "wb") as file:
        file.write(eval_report_str.encode("utf-8"))

    # Create model_metrics as per evaluation report in s3
    model_metrics = ModelMetrics(
        model_statistics=MetricsSource(
            s3_uri=eval_report_s3_uri,
            content_type="application/json",
        )
    )

    sample_data = pd.read_csv(sample_data, nrows=10)
    sample_data.pop(label_column)

    schema_builder = SchemaBuilder(
        sample_input=sample_data.to_numpy(),
        sample_output=model.predict(sample_data),
    )

    model_path = Path("/tmp/model/")
    model_path.mkdir(parents=True, exist_ok=True)
    model.save_model(model_path / "xgboost-model")

    # Build the trained model and register it
    model_builder = ModelBuilder(
        model_path=str(model_path),
        inference_spec=XGBoostSpec(),
        schema_builder=schema_builder,
        role_arn=role,
        s3_model_data_url=f"s3://{bucket}/{model_package_group_name}/model-artifacts",
    )
    model_package = model_builder.build().register(
        model_package_group_name=model_package_group_name,
        approval_status=model_approval_status,
        model_metrics=model_metrics,
    )

    print(f"Registered Model Package ARN: {model_package.model_package_arn}")
    return model_package.model_package_arn

Putting everything together: creating the Pipeline and running the pipeline execution

We connect all defined pipeline @step functions into a multi-step pipeline. Then, we submit and execute the pipeline.

[ ]:
from sagemaker.workflow.pipeline import Pipeline

delayed_data = preprocess(
    raw_data_s3_path=input_path,
    output_prefix=f"{pipeline_name}/dataset",
)
delayed_model = train(train_s3_path=delayed_data[0], validation_s3_path=delayed_data[1])
delayed_evaluation = evaluate(model=delayed_model, test_s3_path=delayed_data[2])
delayed_register = register(
    model=delayed_model,
    evaluation=delayed_evaluation,
    model_approval_status=model_approval_status,
    sample_data=delayed_data[2],
)

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        instance_type,
        model_approval_status,
    ],
    steps=[
        delayed_register,
    ],
)
[ ]:
pipeline.upsert(role_arn=role)
[ ]:
execution = pipeline.start()
[ ]:
execution.describe()
[ ]:
execution.wait()
[ ]:
execution.list_steps()

Using Trained Model in Model Registry to perform Batch Inference

The pipeline execution automatically runs preprocessing, training, evaluation, and registration of trained model to the model registry. The model versions in model registry are candidate models to actual deployment, and we can approve a model version in model registry after the pipeline execution completes.

The code snippet is the example code to get the latest model version in the model registry and update its status to "Approved".

[ ]:
sm = boto3.client("sagemaker")

# Get the latest model
model_package_arn = execution.result(step_name="model-registration")
model_package_update_input_dict = {
    "ModelPackageArn": model_package_arn,
    "ModelApprovalStatus": "Approved",
}
model_package_update_response = sm.update_model_package(**model_package_update_input_dict)

We can use describe_model_package() to see the details of the approved model package.

[ ]:
sm.describe_model_package(ModelPackageName=model_package_arn)

The "ModelDataUrl" inside InferenceSpecification indicates the location of model artifacts. We will use the model artifact to load the model for batch inference.

[ ]:
model_package = sm.describe_model_package(ModelPackageName=model_package_arn)
model_artifact_s3_path = model_package["InferenceSpecification"]["Containers"][0]["ModelDataUrl"]

We can create another pipeline for batch inference using the approved model version. In this example, we take the original dataset (with ID and labels removed) as the dataset to be predicted.

[ ]:
# create a dataset for batch prediction
import pandas as pd

batch_dataset_filename = "wdbc_to_be_predicted.csv"
data_df = pd.read_csv(
    input_path,
    header=None,
    names=feature_names,
)
data_df = data_df.drop(
    data_df.columns[[0, 1]], axis=1
)  # Remove the first two columns (ID and Labels)
data_df.to_csv(batch_dataset_filename, index=False)
batch_dataset_s3_path = sagemaker_session.upload_data(
    path=batch_dataset_filename,
    key_prefix=f"{pipeline_name}/dataset/",
)
[ ]:
inference_pipeline_name = "lowcode-breast-cancer-xgb-inference"
[ ]:
@step(
    name="batch-inference",
    instance_type=instance_type,
    keep_alive_period_in_seconds=300,
)
def batch_inference(
    model_package_s3_path: str,
    dataset_s3_path: str,
    output_s3_path: str,
):
    import numpy as np
    import pandas as pd
    import s3fs
    import tarfile
    from xgboost import XGBClassifier

    package_filename = "serve.tar.gz"
    s3_fs = s3fs.S3FileSystem()
    s3_fs.get_file(model_package_s3_path, package_filename)
    with tarfile.open(package_filename) as tar:
        tar.extractall(path=".")
    model = XGBClassifier()
    model.load_model("xgboost-model")

    df = pd.read_csv(dataset_s3_path)
    prediction_probabilities = model.predict_proba(df)
    predictions = np.argmax(prediction_probabilities, axis=1)
    with s3_fs.open(output_s3_path, "wb") as file:
        file.write("\n".join(np.char.mod("%d", predictions)).encode("utf-8"))
[ ]:
from sagemaker.workflow.pipeline import Pipeline

delayed_inference = batch_inference(
    model_artifact_s3_path,
    batch_dataset_s3_path,
    f"s3://{bucket}/{inference_pipeline_name}/output/predictions.txt",
)

batch_inference_pipeline = Pipeline(
    name=inference_pipeline_name,
    parameters=[
        instance_type,
    ],
    steps=[
        delayed_inference,
    ],
)
[ ]:
batch_inference_pipeline.upsert(role_arn=role)
[ ]:
execution = batch_inference_pipeline.start()
[ ]:
execution.describe()
[ ]:
execution.wait()
[ ]:
execution.list_steps()

Clean up Resources

When you finish with the notebook, you can delete unused resources such as model package, and pipeline. Here are the example codes for clean up, you can adjust the code to follow your variable names.

Delete model package

[ ]:
paginator = sm.get_paginator("list_model_packages")
for page in paginator.paginate(ModelPackageGroupName=model_package_group_name):
    for model_package in page["ModelPackageSummaryList"]:
        print(model_package["ModelPackageArn"])
        sm.delete_model_package(ModelPackageName=model_package["ModelPackageArn"])

sm.delete_model_package_group(ModelPackageGroupName=model_package_group_name)

Delete pipeline

[ ]:
pipeline.delete()
batch_inference_pipeline.delete()

Notebook CI Test Results

This notebook was tested in multiple regions. The test results are as follows, except for us-west-2 which is shown at the top of the notebook.

This us-east-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This us-east-2 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This us-west-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This ca-central-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This sa-east-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This eu-west-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This eu-west-2 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This eu-west-3 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This eu-central-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This eu-north-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This ap-southeast-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This ap-southeast-2 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This ap-northeast-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This ap-northeast-2 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This ap-south-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

[ ]: