SageMaker Pipelines with MLflow

This notebook’s CI test result for us-west-2 is as follows. CI test results in other regions can be found at the end of the notebook.

This us-west-2 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

Setup environment

Import necessary libraries

[ ]:
import os

import sagemaker
from sagemaker.workflow.execution_variables import ExecutionVariables
from sagemaker.workflow.function_step import step
from sagemaker.workflow.parameters import ParameterString
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.fail_step import FailStep

Declare some variables used later

[ ]:
sagemaker_session = sagemaker.session.Session()
role = sagemaker.get_execution_role()
bucket = sagemaker_session.default_bucket()
region = sagemaker_session.boto_region_name

pipeline_name = "breast-cancer-xgb"
instance_type = ParameterString(name="TrainingInstanceType", default_value="ml.m5.xlarge")

# Mlflow (replace these values with your own)
tracking_server_arn = "your tracking server arn"
experiment_name = "sm-pipelines-experiment"

Write requirements and config files that’ll be used by the steps in our SageMaker Pipeline

[ ]:
%%writefile config.yaml
SchemaVersion: '1.0'
SageMaker:
  PythonSDK:
    Modules:
      RemoteFunction:
        # role arn is not required if in SageMaker Notebook instance or SageMaker Studio
        # Uncomment the following line and replace with the right execution role if in a local IDE
        # RoleArn: <replace the role arn here>
        InstanceType: ml.m5.xlarge
        Dependencies: ./requirements.txt
        IncludeLocalWorkDir: true
        CustomFileFilter:
          IgnoreNamePatterns: # files or directories to ignore
          - "*.ipynb" # all notebook files
[ ]:
# Set path to config file
os.environ["SAGEMAKER_USER_CONFIG_OVERRIDE"] = os.getcwd()
[ ]:
%%writefile requirements.txt
scikit-learn
xgboost==1.7.6
s3fs==0.4.2
sagemaker>=2.199.0,<3
pandas>=2.0.0
gevent
geventhttpclient
shap
matplotlib
fsspec
mlflow==2.13.2
sagemaker-mlflow==0.1.0

Define the SageMaker Pipeline

Preprocessing Step

[ ]:
# Location of our dataset
input_path = f"s3://sagemaker-example-files-prod-{region}/datasets/tabular/breast_cancer/wdbc.csv"

The breast cancer Wisconsin dataset contains column id which we do not use for training. The second column diagnosis is class label, and the label is represented using ‘M’ for Malignant class, and ‘B’ for Benign class.

In the preprocessing step, we drop the column id, then split the dataset into three distinct sets: train, validation, and test set.

Note that keep_alive_period_in_seconds parameter in @step decorator indicates how many seconds we want to keep the instance alive, waiting to be reused for the next pipeline step execution. Setting this parameter speeds up the pipeline execution because we reduce the launching of new instances to execute pipeline steps.

[ ]:
random_state = 2023
label_column = "diagnosis"

feature_names = [
    "id",
    "diagnosis",
    "radius_mean",
    "texture_mean",
    "perimeter_mean",
    "area_mean",
    "smoothness_mean",
    "compactness_mean",
    "concavity_mean",
    "concave points_mean",
    "symmetry_mean",
    "fractal_dimension_mean",
    "radius_se",
    "texture_se",
    "perimeter_se",
    "area_se",
    "smoothness_se",
    "compactness_se",
    "concavity_se",
    "concave points_se",
    "symmetry_se",
    "fractal_dimension_se",
    "radius_worst",
    "texture_worst",
    "perimeter_worst",
    "area_worst",
    "smoothness_worst",
    "compactness_worst",
    "concavity_worst",
    "concave points_worst",
    "symmetry_worst",
    "fractal_dimension_worst",
]


@step(
    name="DataPreprocessing",
    instance_type=instance_type,
)
def preprocess(
    raw_data_s3_path: str,
    output_prefix: str,
    experiment_name: str,
    run_name: str,
    test_size: float = 0.2,
) -> tuple:
    import mlflow
    import pandas as pd
    from sklearn.model_selection import train_test_split

    mlflow.set_tracking_uri(tracking_server_arn)
    mlflow.set_experiment(experiment_name)
    with mlflow.start_run(run_name=run_name) as run:
        run_id = run.info.run_id
        with mlflow.start_run(run_name="DataPreprocessing", nested=True):
            df = pd.read_csv(raw_data_s3_path, header=None, names=feature_names)
            df.drop(columns="id", inplace=True)
            mlflow.log_input(
                mlflow.data.from_pandas(df, raw_data_s3_path, targets=label_column),
                context="DataPreprocessing",
            )

            train_df, test_df = train_test_split(df, test_size=0.2, stratify=df[label_column])
            validation_df, test_df = train_test_split(
                test_df, test_size=0.5, stratify=test_df[label_column]
            )
            train_df.reset_index(inplace=True, drop=True)
            validation_df.reset_index(inplace=True, drop=True)
            test_df.reset_index(inplace=True, drop=True)

            train_s3_path = f"s3://{bucket}/{output_prefix}/train.csv"
            val_s3_path = f"s3://{bucket}/{output_prefix}/val.csv"
            test_s3_path = f"s3://{bucket}/{output_prefix}/test.csv"

            train_df.to_csv(train_s3_path, index=False)
            validation_df.to_csv(val_s3_path, index=False)
            test_df.to_csv(test_s3_path, index=False)

    return train_s3_path, val_s3_path, test_s3_path, experiment_name, run_id

Training Step

We train a XGBoost model in this training step, using @step-decorated function with the S3 path of training and validation set, along with XGBoost hyperparameters. The S3 paths for both training and validation set is coming from the output of the previous step.

[ ]:
use_gpu = False
param = dict(
    objective="binary:logistic",
    max_depth=5,
    eta=0.2,
    gamma=4,
    min_child_weight=6,
    subsample=0.7,
    tree_method="gpu_hist" if use_gpu else "hist",  # Use GPU accelerated algorithm
)
num_round = 50


@step(
    name="ModelTraining",
    instance_type=instance_type,
)
def train(
    train_s3_path: str,
    validation_s3_path: str,
    experiment_name: str,
    run_id: str,
    param: dict = param,
    num_round: int = num_round,
):
    import mlflow
    import pandas as pd
    from xgboost import XGBClassifier

    mlflow.set_tracking_uri(tracking_server_arn)
    mlflow.set_experiment(experiment_name)

    with mlflow.start_run(run_id=run_id):
        with mlflow.start_run(run_name="ModelTraining", nested=True) as training_run:
            training_run_id = training_run.info.run_id
            mlflow.xgboost.autolog(
                log_input_examples=True,
                log_model_signatures=True,
                log_models=True,
                log_datasets=True,
                model_format="xgb",
            )

            # read data files from S3
            train_df = pd.read_csv(train_s3_path)
            validation_df = pd.read_csv(validation_s3_path)

            # create dataframe and label series
            y_train = (train_df.pop(label_column) == "M").astype("int")
            y_validation = (validation_df.pop(label_column) == "M").astype("int")

            xgb = XGBClassifier(n_estimators=num_round, **param)
            xgb.fit(
                train_df,
                y_train,
                eval_set=[(validation_df, y_validation)],
                early_stopping_rounds=5,
            )

        # return xgb
        return experiment_name, run_id, training_run_id

Evaluation Step

In this step, we create a @step-decorated function to evaluate the trained XGBoost model on the test dataset.

[ ]:
@step(
    name="ModelEvaluation",
    instance_type=instance_type,
)
def evaluate(
    test_s3_path: str,
    experiment_name: str,
    run_id: str,
    training_run_id: str,
) -> dict:
    import mlflow
    import pandas as pd

    mlflow.set_tracking_uri(tracking_server_arn)
    mlflow.set_experiment(experiment_name)

    with mlflow.start_run(run_id=run_id):
        with mlflow.start_run(run_name="ModelEvaluation", nested=True):
            test_df = pd.read_csv(test_s3_path)
            test_df[label_column] = (test_df[label_column] == "M").astype("int")
            model = mlflow.pyfunc.load_model(f"runs:/{training_run_id}/model")

            results = mlflow.evaluate(
                model=model,
                data=test_df,
                targets=label_column,
                model_type="classifier",
                evaluators=["default"],
            )
            return {"f1_score": results.metrics["f1_score"]}

Model Registration

In this step, we create a @step-decorated function to register our XGBoost model.

[ ]:
@step(
    name="ModelRegistration",
    instance_type=instance_type,
)
def register(
    pipeline_name: str,
    experiment_name: str,
    run_id: str,
    training_run_id: str,
):
    import mlflow

    mlflow.set_tracking_uri(tracking_server_arn)
    mlflow.set_experiment(experiment_name)

    with mlflow.start_run(run_id=run_id):
        with mlflow.start_run(run_name="ModelRegistration", nested=True):
            mlflow.register_model(f"runs:/{training_run_id}/model", pipeline_name)

Creating the SageMaker Pipeline

We connect all defined pipeline @step functions into a multi-step pipeline. Then, we submit and execute the pipeline.

[ ]:
preprocessing_step = preprocess(
    raw_data_s3_path=input_path,
    output_prefix=f"{pipeline_name}/dataset",
    experiment_name=experiment_name,
    run_name=ExecutionVariables.PIPELINE_EXECUTION_ID,
)

training_step = train(
    train_s3_path=preprocessing_step[0],
    validation_s3_path=preprocessing_step[1],
    experiment_name=preprocessing_step[3],
    run_id=preprocessing_step[4],
)

conditional_register_step = ConditionStep(
    name="ConditionalRegister",
    conditions=[
        ConditionGreaterThanOrEqualTo(
            left=evaluate(
                test_s3_path=preprocessing_step[2],
                experiment_name=preprocessing_step[3],
                run_id=preprocessing_step[4],
                training_run_id=training_step[2],
            )["f1_score"],
            right=0.8,
        )
    ],
    if_steps=[
        register(
            pipeline_name=pipeline_name,
            experiment_name=preprocessing_step[3],
            run_id=preprocessing_step[4],
            training_run_id=training_step[2],
        )
    ],
    else_steps=[FailStep(name="Fail", error_message="Model performance is not good enough")],
)

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        instance_type,
    ],
    steps=[preprocessing_step, training_step, conditional_register_step],
)
[ ]:
pipeline.upsert(role_arn=role)

Execute the SageMaker Pipeline

[ ]:
execution = pipeline.start()
[ ]:
execution.describe()
[ ]:
execution.wait()
[ ]:
execution.list_steps()

Notebook CI Test Results

This notebook was tested in multiple regions. The test results are as follows, except for us-west-2 which is shown at the top of the notebook.

This us-east-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This us-east-2 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This us-west-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This ca-central-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This sa-east-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This eu-west-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This eu-west-2 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This eu-west-3 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This eu-central-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This eu-north-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This ap-southeast-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This ap-southeast-2 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This ap-northeast-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This ap-northeast-2 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This ap-south-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable