Basic Pipeline for Batch Inference using Low-code Experience for SageMaker Pipelines
This notebook’s CI test result for us-west-2 is as follows. CI test results in other regions can be found at the end of the notebook.
This notebook shows the example of orchestrating jobs for model building and batch inference using low-code experience for SageMaker Pipelines, utilizing @step decorator. We build an automated model building pipeline for a classification problem predicting if a breast cancer diagnostic data indicates benign or malignant tumor. The model building pipeline includes preprocessing step, training step, evaluation step, and register model step. We also create another pipeline to perform batch inference.
Dataset
We use breast cancer Wisconsin diagnostic dataset: > > Wolberg,William, Mangasarian,Olvi, Street,Nick, and Street,W.. (1995). Breast Cancer Wisconsin (Diagnostic). UCI Machine Learning Repository. https://doi.org/10.24432/C5DW2B.
The dataset can be downloaded from UCI Machine Learning Repository: https://archive.ics.uci.edu/dataset/17/breast+cancer+wisconsin+diagnostic
Notebook Preparation
In this section we install and prepare the library dependencies we need to use in this notebook, as well as initiate our sagemaker session.
[ ]:
%pip install -U boto3
[ ]:
import os
import boto3
[ ]:
%pip install -r ./requirements.txt
We can use configuration file config.yaml
to set default values of the infrastructure such as instance type, and dependencies to run the pipeline. We use environment variable “SAGEMAKER_USER_CONFIG_OVERRIDE” to set the path to configuration file.
[ ]:
# Set path to config file
os.environ["SAGEMAKER_USER_CONFIG_OVERRIDE"] = os.getcwd()
[ ]:
import sagemaker
from sagemaker.workflow.function_step import step
from sagemaker.workflow.parameters import ParameterString
sagemaker_session = sagemaker.session.Session()
role = sagemaker.get_execution_role()
bucket = sagemaker_session.default_bucket()
region = sagemaker_session.boto_region_name
Define variables and pipeline parameters
[ ]:
# Location of our dataset
input_path = f"s3://sagemaker-example-files-prod-{region}/datasets/tabular/breast_cancer/wdbc.csv"
[ ]:
pipeline_name = "lowcode-breast-cancer-xgb"
model_package_group_name = "lowcode-breast-cancer-xgb"
[ ]:
instance_type = ParameterString(name="TrainingInstanceType", default_value="ml.m5.xlarge")
model_approval_status = ParameterString(
name="ModelApprovalStatus", default_value="PendingManualApproval"
)
Preprocessing Step
The breast cancer Wisconsin dataset contains column id
which we do not use for training. The second column diagnosis
is class label, and the label is represented using ‘M’ for Malignant class, and ‘B’ for Benign class.
In the preprocessing step, we drop the column id
, then split the dataset into three distinct sets: train, validation, and test set.
Note that keep_alive_period_in_seconds
parameter in @step decorator indicates how many seconds we want to keep the instance alive, waiting to be reused for the next pipeline step execution. Setting this parameter speeds up the pipeline execution because we reduce the launching of new instances to execute pipeline steps.
[ ]:
random_state = 2023
label_column = "diagnosis"
feature_names = [
"id",
"diagnosis",
"radius_mean",
"texture_mean",
"perimeter_mean",
"area_mean",
"smoothness_mean",
"compactness_mean",
"concavity_mean",
"concave points_mean",
"symmetry_mean",
"fractal_dimension_mean",
"radius_se",
"texture_se",
"perimeter_se",
"area_se",
"smoothness_se",
"compactness_se",
"concavity_se",
"concave points_se",
"symmetry_se",
"fractal_dimension_se",
"radius_worst",
"texture_worst",
"perimeter_worst",
"area_worst",
"smoothness_worst",
"compactness_worst",
"concavity_worst",
"concave points_worst",
"symmetry_worst",
"fractal_dimension_worst",
]
@step(
name="data-preprocessing",
instance_type=instance_type,
keep_alive_period_in_seconds=300,
)
def preprocess(raw_data_s3_path: str, output_prefix: str) -> tuple:
import pandas as pd
from sklearn.model_selection import train_test_split
df = pd.read_csv(raw_data_s3_path, header=None, names=feature_names)
df.drop(columns="id", inplace=True)
train_df, test_df = train_test_split(df, test_size=0.2, stratify=df[label_column])
validation_df, test_df = train_test_split(
test_df, test_size=0.5, stratify=test_df[label_column]
)
train_df.reset_index(inplace=True, drop=True)
validation_df.reset_index(inplace=True, drop=True)
test_df.reset_index(inplace=True, drop=True)
train_s3_path = f"s3://{bucket}/{output_prefix}/train.csv"
val_s3_path = f"s3://{bucket}/{output_prefix}/val.csv"
test_s3_path = f"s3://{bucket}/{output_prefix}/test.csv"
train_df.to_csv(train_s3_path, index=False)
validation_df.to_csv(val_s3_path, index=False)
test_df.to_csv(test_s3_path, index=False)
return train_s3_path, val_s3_path, test_s3_path
Training Step
We train an XGBoost model in this training step, using @step-decorated function with the S3 path of training and validation set, along with XGBoost hyperparameters. The S3 paths for both training and validation set is coming from the output of the previous step.
[ ]:
use_gpu = False
param = dict(
objective="binary:logistic",
max_depth=5,
eta=0.2,
gamma=4,
min_child_weight=6,
subsample=0.7,
tree_method="gpu_hist" if use_gpu else "hist", # Use GPU accelerated algorithm
)
num_round = 50
@step(
name="model-training",
instance_type=instance_type,
keep_alive_period_in_seconds=300,
)
def train(
train_s3_path: str,
validation_s3_path: str,
param: dict = param,
num_round: int = num_round,
):
import pandas as pd
from xgboost import XGBClassifier
# read data files from S3
train_df = pd.read_csv(train_s3_path)
validation_df = pd.read_csv(validation_s3_path)
# create dataframe and label series
y_train = (train_df.pop(label_column) == "M").astype("int")
y_validation = (validation_df.pop(label_column) == "M").astype("int")
xgb = XGBClassifier(n_estimators=num_round, **param)
xgb.fit(
train_df,
y_train,
eval_set=[(validation_df, y_validation)],
early_stopping_rounds=5,
)
return xgb
Evaluation Step
In this step, we create a @step-decorated function evaluate the trained XGBoost model on the test dataset.
[ ]:
@step(
name="model-evaluation",
instance_type=instance_type,
keep_alive_period_in_seconds=300,
)
def evaluate(model, test_s3_path: str) -> dict:
import json
import numpy as np
import pandas as pd
from sklearn.metrics import (
accuracy_score,
auc,
confusion_matrix,
f1_score,
precision_score,
recall_score,
roc_curve,
)
test_df = pd.read_csv(test_s3_path)
y_test = (test_df.pop(label_column) == "M").astype("int")
prediction_probabilities = model.predict_proba(test_df)
predictions = np.argmax(prediction_probabilities, axis=1)
acc = accuracy_score(y_test, predictions)
precision = precision_score(y_test, predictions, zero_division=1)
recall = recall_score(y_test, predictions)
f1 = f1_score(y_test, predictions)
conf_matrix = confusion_matrix(y_test, predictions)
fpr, tpr, _ = roc_curve(y_test, prediction_probabilities[:, 1])
auc_value = auc(fpr, tpr)
report_dict = {
"binary_classification_metrics": {
"accuracy": {"value": acc, "standard_deviation": "NaN"},
"f1": {"value": f1, "standard_deviation": "NaN"},
"precision": {"value": precision, "standard_deviation": "NaN"},
"recall": {"value": recall, "standard_deviation": "NaN"},
"confusion_matrix": {
"0": {"0": int(conf_matrix[0][0]), "1": int(conf_matrix[0][1])},
"1": {"0": int(conf_matrix[1][0]), "1": int(conf_matrix[1][1])},
},
"receiver_operating_characteristic_curve": {
"false_positive_rates": list(fpr),
"true_positive_rates": list(tpr),
},
"auc": {"value": auc_value, "standard_deviation": "NaN"},
},
}
print(f"evaluation report: {json.dumps(report_dict, indent=2)}")
return report_dict
Model Registration Step
In this step, we register the trained model to Model Registry. We use ModelBuilder
to build model artifacts for inference.
[ ]:
@step(
name="model-registration",
instance_type=instance_type,
keep_alive_period_in_seconds=300,
)
def register(model, evaluation, model_approval_status, sample_data):
import json
import numpy as np
import pandas as pd
import s3fs
from pathlib import Path
from sagemaker import MetricsSource, ModelMetrics
from sagemaker.serve.builder.model_builder import ModelBuilder
from sagemaker.serve.builder.schema_builder import SchemaBuilder
from sagemaker.serve.spec.inference_spec import InferenceSpec
from sagemaker.utils import unique_name_from_base
from xgboost import XGBClassifier
class XGBoostSpec(InferenceSpec):
def load(self, model_dir: str):
print(model_dir)
model = XGBClassifier()
model.load_model(model_dir + "/xgboost-model")
return model
def invoke(self, input_object: object, model: object):
prediction_probabilities = model.predict_proba(input_object)
predictions = np.argmax(prediction_probabilities, axis=1)
return predictions
# Upload evaluation report to s3
eval_file_name = unique_name_from_base("evaluation")
eval_report_s3_uri = (
f"s3://{bucket}/{model_package_group_name}/evaluation-report/{eval_file_name}.json"
)
s3_fs = s3fs.S3FileSystem()
eval_report_str = json.dumps(evaluation)
with s3_fs.open(eval_report_s3_uri, "wb") as file:
file.write(eval_report_str.encode("utf-8"))
# Create model_metrics as per evaluation report in s3
model_metrics = ModelMetrics(
model_statistics=MetricsSource(
s3_uri=eval_report_s3_uri,
content_type="application/json",
)
)
sample_data = pd.read_csv(sample_data, nrows=10)
sample_data.pop(label_column)
schema_builder = SchemaBuilder(
sample_input=sample_data.to_numpy(),
sample_output=model.predict(sample_data),
)
model_path = Path("/tmp/model/")
model_path.mkdir(parents=True, exist_ok=True)
model.save_model(model_path / "xgboost-model")
# Build the trained model and register it
model_builder = ModelBuilder(
model_path=str(model_path),
inference_spec=XGBoostSpec(),
schema_builder=schema_builder,
role_arn=role,
s3_model_data_url=f"s3://{bucket}/{model_package_group_name}/model-artifacts",
)
model_package = model_builder.build().register(
model_package_group_name=model_package_group_name,
approval_status=model_approval_status,
model_metrics=model_metrics,
)
print(f"Registered Model Package ARN: {model_package.model_package_arn}")
return model_package.model_package_arn
Putting everything together: creating the Pipeline and running the pipeline execution
We connect all defined pipeline @step
functions into a multi-step pipeline. Then, we submit and execute the pipeline.
[ ]:
from sagemaker.workflow.pipeline import Pipeline
delayed_data = preprocess(
raw_data_s3_path=input_path,
output_prefix=f"{pipeline_name}/dataset",
)
delayed_model = train(train_s3_path=delayed_data[0], validation_s3_path=delayed_data[1])
delayed_evaluation = evaluate(model=delayed_model, test_s3_path=delayed_data[2])
delayed_register = register(
model=delayed_model,
evaluation=delayed_evaluation,
model_approval_status=model_approval_status,
sample_data=delayed_data[2],
)
pipeline = Pipeline(
name=pipeline_name,
parameters=[
instance_type,
model_approval_status,
],
steps=[
delayed_register,
],
)
[ ]:
pipeline.upsert(role_arn=role)
[ ]:
execution = pipeline.start()
[ ]:
execution.describe()
[ ]:
execution.wait()
[ ]:
execution.list_steps()
Using Trained Model in Model Registry to perform Batch Inference
The pipeline execution automatically runs preprocessing, training, evaluation, and registration of trained model to the model registry. The model versions in model registry are candidate models to actual deployment, and we can approve a model version in model registry after the pipeline execution completes.
The code snippet is the example code to get the latest model version in the model registry and update its status to "Approved"
.
[ ]:
sm = boto3.client("sagemaker")
# Get the latest model
model_package_arn = execution.result(step_name="model-registration")
model_package_update_input_dict = {
"ModelPackageArn": model_package_arn,
"ModelApprovalStatus": "Approved",
}
model_package_update_response = sm.update_model_package(**model_package_update_input_dict)
We can use describe_model_package()
to see the details of the approved model package.
[ ]:
sm.describe_model_package(ModelPackageName=model_package_arn)
The "ModelDataUrl"
inside InferenceSpecification
indicates the location of model artifacts. We will use the model artifact to load the model for batch inference.
[ ]:
model_package = sm.describe_model_package(ModelPackageName=model_package_arn)
model_artifact_s3_path = model_package["InferenceSpecification"]["Containers"][0]["ModelDataUrl"]
We can create another pipeline for batch inference using the approved model version. In this example, we take the original dataset (with ID and labels removed) as the dataset to be predicted.
[ ]:
# create a dataset for batch prediction
import pandas as pd
batch_dataset_filename = "wdbc_to_be_predicted.csv"
data_df = pd.read_csv(
input_path,
header=None,
names=feature_names,
)
data_df = data_df.drop(
data_df.columns[[0, 1]], axis=1
) # Remove the first two columns (ID and Labels)
data_df.to_csv(batch_dataset_filename, index=False)
batch_dataset_s3_path = sagemaker_session.upload_data(
path=batch_dataset_filename,
key_prefix=f"{pipeline_name}/dataset/",
)
[ ]:
inference_pipeline_name = "lowcode-breast-cancer-xgb-inference"
[ ]:
@step(
name="batch-inference",
instance_type=instance_type,
keep_alive_period_in_seconds=300,
)
def batch_inference(
model_package_s3_path: str,
dataset_s3_path: str,
output_s3_path: str,
):
import numpy as np
import pandas as pd
import s3fs
import tarfile
from xgboost import XGBClassifier
package_filename = "serve.tar.gz"
s3_fs = s3fs.S3FileSystem()
s3_fs.get_file(model_package_s3_path, package_filename)
with tarfile.open(package_filename) as tar:
tar.extractall(path=".")
model = XGBClassifier()
model.load_model("xgboost-model")
df = pd.read_csv(dataset_s3_path)
prediction_probabilities = model.predict_proba(df)
predictions = np.argmax(prediction_probabilities, axis=1)
with s3_fs.open(output_s3_path, "wb") as file:
file.write("\n".join(np.char.mod("%d", predictions)).encode("utf-8"))
[ ]:
from sagemaker.workflow.pipeline import Pipeline
delayed_inference = batch_inference(
model_artifact_s3_path,
batch_dataset_s3_path,
f"s3://{bucket}/{inference_pipeline_name}/output/predictions.txt",
)
batch_inference_pipeline = Pipeline(
name=inference_pipeline_name,
parameters=[
instance_type,
],
steps=[
delayed_inference,
],
)
[ ]:
batch_inference_pipeline.upsert(role_arn=role)
[ ]:
execution = batch_inference_pipeline.start()
[ ]:
execution.describe()
[ ]:
execution.wait()
[ ]:
execution.list_steps()
Clean up Resources
When you finish with the notebook, you can delete unused resources such as model package, and pipeline. Here are the example codes for clean up, you can adjust the code to follow your variable names.
Delete model package
[ ]:
paginator = sm.get_paginator("list_model_packages")
for page in paginator.paginate(ModelPackageGroupName=model_package_group_name):
for model_package in page["ModelPackageSummaryList"]:
print(model_package["ModelPackageArn"])
sm.delete_model_package(ModelPackageName=model_package["ModelPackageArn"])
sm.delete_model_package_group(ModelPackageGroupName=model_package_group_name)
Delete pipeline
[ ]:
pipeline.delete()
batch_inference_pipeline.delete()
Notebook CI Test Results
This notebook was tested in multiple regions. The test results are as follows, except for us-west-2 which is shown at the top of the notebook.
[ ]: