SageMaker Pipelines with MLflow
This notebook’s CI test result for us-west-2 is as follows. CI test results in other regions can be found at the end of the notebook.
Setup environment
Import necessary libraries
[ ]:
import os
import sagemaker
from sagemaker.workflow.execution_variables import ExecutionVariables
from sagemaker.workflow.function_step import step
from sagemaker.workflow.parameters import ParameterString
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.fail_step import FailStep
Declare some variables used later
[ ]:
sagemaker_session = sagemaker.session.Session()
role = sagemaker.get_execution_role()
bucket = sagemaker_session.default_bucket()
region = sagemaker_session.boto_region_name
pipeline_name = "breast-cancer-xgb"
instance_type = ParameterString(name="TrainingInstanceType", default_value="ml.m5.xlarge")
# Mlflow (replace these values with your own)
tracking_server_arn = "your tracking server arn"
experiment_name = "sm-pipelines-experiment"
Write requirements and config files that’ll be used by the steps in our SageMaker Pipeline
[ ]:
%%writefile config.yaml
SchemaVersion: '1.0'
SageMaker:
PythonSDK:
Modules:
RemoteFunction:
# role arn is not required if in SageMaker Notebook instance or SageMaker Studio
# Uncomment the following line and replace with the right execution role if in a local IDE
# RoleArn: <replace the role arn here>
InstanceType: ml.m5.xlarge
Dependencies: ./requirements.txt
IncludeLocalWorkDir: true
CustomFileFilter:
IgnoreNamePatterns: # files or directories to ignore
- "*.ipynb" # all notebook files
[ ]:
# Set path to config file
os.environ["SAGEMAKER_USER_CONFIG_OVERRIDE"] = os.getcwd()
[ ]:
%%writefile requirements.txt
scikit-learn
xgboost==1.7.6
s3fs==0.4.2
sagemaker>=2.199.0,<3
pandas>=2.0.0
gevent
geventhttpclient
shap
matplotlib
fsspec
mlflow==2.13.2
sagemaker-mlflow==0.1.0
Define the SageMaker Pipeline
Preprocessing Step
[ ]:
# Location of our dataset
input_path = f"s3://sagemaker-example-files-prod-{region}/datasets/tabular/breast_cancer/wdbc.csv"
The breast cancer Wisconsin dataset contains column id which we do not use for training. The second column diagnosis is class label, and the label is represented using ‘M’ for Malignant class, and ‘B’ for Benign class.
In the preprocessing step, we drop the column id, then split the dataset into three distinct sets: train, validation, and test set.
Note that keep_alive_period_in_seconds parameter in @step decorator indicates how many seconds we want to keep the instance alive, waiting to be reused for the next pipeline step execution. Setting this parameter speeds up the pipeline execution because we reduce the launching of new instances to execute pipeline steps.
[ ]:
random_state = 2023
label_column = "diagnosis"
feature_names = [
"id",
"diagnosis",
"radius_mean",
"texture_mean",
"perimeter_mean",
"area_mean",
"smoothness_mean",
"compactness_mean",
"concavity_mean",
"concave points_mean",
"symmetry_mean",
"fractal_dimension_mean",
"radius_se",
"texture_se",
"perimeter_se",
"area_se",
"smoothness_se",
"compactness_se",
"concavity_se",
"concave points_se",
"symmetry_se",
"fractal_dimension_se",
"radius_worst",
"texture_worst",
"perimeter_worst",
"area_worst",
"smoothness_worst",
"compactness_worst",
"concavity_worst",
"concave points_worst",
"symmetry_worst",
"fractal_dimension_worst",
]
@step(
name="DataPreprocessing",
instance_type=instance_type,
)
def preprocess(
raw_data_s3_path: str,
output_prefix: str,
experiment_name: str,
run_name: str,
test_size: float = 0.2,
) -> tuple:
import mlflow
import pandas as pd
from sklearn.model_selection import train_test_split
mlflow.set_tracking_uri(tracking_server_arn)
mlflow.set_experiment(experiment_name)
with mlflow.start_run(run_name=run_name) as run:
run_id = run.info.run_id
with mlflow.start_run(run_name="DataPreprocessing", nested=True):
df = pd.read_csv(raw_data_s3_path, header=None, names=feature_names)
df.drop(columns="id", inplace=True)
mlflow.log_input(
mlflow.data.from_pandas(df, raw_data_s3_path, targets=label_column),
context="DataPreprocessing",
)
train_df, test_df = train_test_split(df, test_size=0.2, stratify=df[label_column])
validation_df, test_df = train_test_split(
test_df, test_size=0.5, stratify=test_df[label_column]
)
train_df.reset_index(inplace=True, drop=True)
validation_df.reset_index(inplace=True, drop=True)
test_df.reset_index(inplace=True, drop=True)
train_s3_path = f"s3://{bucket}/{output_prefix}/train.csv"
val_s3_path = f"s3://{bucket}/{output_prefix}/val.csv"
test_s3_path = f"s3://{bucket}/{output_prefix}/test.csv"
train_df.to_csv(train_s3_path, index=False)
validation_df.to_csv(val_s3_path, index=False)
test_df.to_csv(test_s3_path, index=False)
return train_s3_path, val_s3_path, test_s3_path, experiment_name, run_id
Training Step
We train a XGBoost model in this training step, using @step-decorated function with the S3 path of training and validation set, along with XGBoost hyperparameters. The S3 paths for both training and validation set is coming from the output of the previous step.
[ ]:
use_gpu = False
param = dict(
objective="binary:logistic",
max_depth=5,
eta=0.2,
gamma=4,
min_child_weight=6,
subsample=0.7,
tree_method="gpu_hist" if use_gpu else "hist", # Use GPU accelerated algorithm
)
num_round = 50
@step(
name="ModelTraining",
instance_type=instance_type,
)
def train(
train_s3_path: str,
validation_s3_path: str,
experiment_name: str,
run_id: str,
param: dict = param,
num_round: int = num_round,
):
import mlflow
import pandas as pd
from xgboost import XGBClassifier
mlflow.set_tracking_uri(tracking_server_arn)
mlflow.set_experiment(experiment_name)
with mlflow.start_run(run_id=run_id):
with mlflow.start_run(run_name="ModelTraining", nested=True) as training_run:
training_run_id = training_run.info.run_id
mlflow.xgboost.autolog(
log_input_examples=True,
log_model_signatures=True,
log_models=True,
log_datasets=True,
model_format="xgb",
)
# read data files from S3
train_df = pd.read_csv(train_s3_path)
validation_df = pd.read_csv(validation_s3_path)
# create dataframe and label series
y_train = (train_df.pop(label_column) == "M").astype("int")
y_validation = (validation_df.pop(label_column) == "M").astype("int")
xgb = XGBClassifier(n_estimators=num_round, **param)
xgb.fit(
train_df,
y_train,
eval_set=[(validation_df, y_validation)],
early_stopping_rounds=5,
)
# return xgb
return experiment_name, run_id, training_run_id
Evaluation Step
In this step, we create a @step-decorated function to evaluate the trained XGBoost model on the test dataset.
[ ]:
@step(
name="ModelEvaluation",
instance_type=instance_type,
)
def evaluate(
test_s3_path: str,
experiment_name: str,
run_id: str,
training_run_id: str,
) -> dict:
import mlflow
import pandas as pd
mlflow.set_tracking_uri(tracking_server_arn)
mlflow.set_experiment(experiment_name)
with mlflow.start_run(run_id=run_id):
with mlflow.start_run(run_name="ModelEvaluation", nested=True):
test_df = pd.read_csv(test_s3_path)
test_df[label_column] = (test_df[label_column] == "M").astype("int")
model = mlflow.pyfunc.load_model(f"runs:/{training_run_id}/model")
results = mlflow.evaluate(
model=model,
data=test_df,
targets=label_column,
model_type="classifier",
evaluators=["default"],
)
return {"f1_score": results.metrics["f1_score"]}
Model Registration
In this step, we create a @step-decorated function to register our XGBoost model.
[ ]:
@step(
name="ModelRegistration",
instance_type=instance_type,
)
def register(
pipeline_name: str,
experiment_name: str,
run_id: str,
training_run_id: str,
):
import mlflow
mlflow.set_tracking_uri(tracking_server_arn)
mlflow.set_experiment(experiment_name)
with mlflow.start_run(run_id=run_id):
with mlflow.start_run(run_name="ModelRegistration", nested=True):
mlflow.register_model(f"runs:/{training_run_id}/model", pipeline_name)
Creating the SageMaker Pipeline
We connect all defined pipeline @step functions into a multi-step pipeline. Then, we submit and execute the pipeline.
[ ]:
preprocessing_step = preprocess(
raw_data_s3_path=input_path,
output_prefix=f"{pipeline_name}/dataset",
experiment_name=experiment_name,
run_name=ExecutionVariables.PIPELINE_EXECUTION_ID,
)
training_step = train(
train_s3_path=preprocessing_step[0],
validation_s3_path=preprocessing_step[1],
experiment_name=preprocessing_step[3],
run_id=preprocessing_step[4],
)
conditional_register_step = ConditionStep(
name="ConditionalRegister",
conditions=[
ConditionGreaterThanOrEqualTo(
left=evaluate(
test_s3_path=preprocessing_step[2],
experiment_name=preprocessing_step[3],
run_id=preprocessing_step[4],
training_run_id=training_step[2],
)["f1_score"],
right=0.8,
)
],
if_steps=[
register(
pipeline_name=pipeline_name,
experiment_name=preprocessing_step[3],
run_id=preprocessing_step[4],
training_run_id=training_step[2],
)
],
else_steps=[FailStep(name="Fail", error_message="Model performance is not good enough")],
)
pipeline = Pipeline(
name=pipeline_name,
parameters=[
instance_type,
],
steps=[preprocessing_step, training_step, conditional_register_step],
)
[ ]:
pipeline.upsert(role_arn=role)
Execute the SageMaker Pipeline
[ ]:
execution = pipeline.start()
[ ]:
execution.describe()
[ ]:
execution.wait()
[ ]:
execution.list_steps()
Notebook CI Test Results
This notebook was tested in multiple regions. The test results are as follows, except for us-west-2 which is shown at the top of the notebook.