Train, Deploy, and Monitor the Music Recommender Model using SageMaker Pipelines

## Background

In this notebook, we’ll build an end-to-end pipeline to create a music recommender using SageMaker Pipelines, which will automate the entire modeling process from the beginning of data ingestion to monitoring the model. SageMaker Pipelines is a tool for building machine learning pipelines that take advantage of direct SageMaker integration. Because of this integration, you can create a pipeline and set up SageMaker Projects for orchestration using a tool that handles much of the step creation and management for you.

If you want to learn more about each step of the pipeline, feel free to look at the series of notebooks listed below. It basically implements the same process in this notebook in a manual way with more detailed descriptions of what each step does. Please see the README.md for more information about the use case implemented by this sequence of notebooks.

1. Music Recommender Data Exploration 1. Music Recommender Data Preparation with SageMaker Feature Store and SageMaker Data Wrangler 1. Train, Deploy, and Monitor the Music Recommender Model using SageMaker SDK

Contents

  1. Architecture: Create a SageMaker Pipeline to Automate All the Steps from Data Prep to Model Deployment

  2. SageMaker Pipeline Overview

  3. Clean Up

Install required and/or update third-party libraries

[ ]:
!python -m pip install -Uq pip
!python -m pip install -q sagemaker==2.45.0 imbalanced-learn awswrangler

Import libraries

[ ]:
import json
import boto3
import pathlib
import sagemaker
import numpy as np
import pandas as pd
import awswrangler as wr

from sagemaker.estimator import Estimator
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import CreateModelStep
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.workflow.step_collections import RegisterModel
from sagemaker.workflow.steps import ProcessingStep, TrainingStep
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.parameters import ParameterInteger, ParameterFloat, ParameterString
from sagemaker.feature_store.feature_group import FeatureGroup
[ ]:
sagemaker.__version__

Set region and boto3 config

[ ]:
import sys
import pprint

sys.path.insert(1, "./code")
[ ]:
region = boto3.Session().region_name
boto3.setup_default_session(region_name=region)
boto_session = boto3.Session(region_name=region)

s3_client = boto3.client("s3", region_name=region)

sagemaker_boto_client = boto_session.client("sagemaker")
sagemaker_session = sagemaker.session.Session(
    boto_session=boto_session, sagemaker_client=sagemaker_boto_client
)
sagemaker_role = sagemaker.get_execution_role()

account_id = boto3.client("sts").get_caller_identity()["Account"]

sess = sagemaker.Session()
bucket = sess.default_bucket()
prefix = "music-recommendation-pipeline"
[ ]:
processing_dir = "/opt/ml/processing"

# Output name is auto-generated from the select node's ID + output name from the flow file.
# You can change to a different node ID to export a different step in the flow file
output_name_tracks = "19ad8e80-2002-4ee9-9753-fe9a384b1166.default"  # tracks node in flow file
output_name_user_preferences = (
    "7a6dad19-2c80-43e3-b03d-ec23c3842ae9.default"  # joined node in flow file"
)
output_name_ratings = "9a283380-91ca-478e-be99-6ba3bf57c680.default"  # ratings node in flow file

# ======> variables used for parameterizing the notebook run
flow_instance_count = 1
flow_instance_type = "ml.m5.4xlarge"

deploy_model_instance_type = "ml.m4.xlarge"

Architecture: Create a SageMaker Pipeline to Automate All the Steps from Data Prep to Model Deployment


arch diagram

## Prereqs: Get Data

Here we will download the music data from a public S3 bucket that we’ll be using for this demo and uploads it to your default S3 bucket that was created for you when you initially created a SageMaker Studio workspace.

[ ]:
from demo_helpers import get_data, get_model, update_data_sources
[ ]:
# create data folder
!mkdir data
[ ]:
# public S3 bucket that contains our music data
s3_bucket_music_data = "s3://sagemaker-sample-files/datasets/tabular/synthetic-music"
[ ]:
new_data_paths = get_data(
    s3_client,
    [f"{s3_bucket_music_data}/tracks.csv", f"{s3_bucket_music_data}/ratings.csv"],
    bucket,
    prefix,
    sample_data=0.70,
)
print(new_data_paths)
[ ]:
# these are the new file paths located on your SageMaker Studio default s3 storage bucket
tracks_data_source = f"s3://{bucket}/{prefix}/tracks.csv"
ratings_data_source = f"s3://{bucket}/{prefix}/ratings.csv"

For this example, we will provide the processed data you need to complete this task. But you are free to take a look at how we processed the data:

[ ]:
files_to_download = [
    f"sample_tracks.csv",
    f"sample_user.csv",
    f"train_data_headers.csv",
    f"train_data.zip",
    f"val_data_headers.csv",
    f"val_data.zip",
    f"tracks_new.csv",
    f"ratings_new.csv",
]

for file in files_to_download:
    s3_client.download_file(
        f"sagemaker-sample-files", f"datasets/tabular/synthetic-music/{file}", f"./data/{file}"
    )
[ ]:
! unzip -o './data/*.zip' -d './data'
! rm ./data/*.zip
[ ]:
# upload train and validation datasets as well
s3_client.upload_file("data/tracks_new.csv", bucket, f"{prefix}/data/tracks_new.csv")
s3_client.upload_file("data/ratings_new.csv", bucket, f"{prefix}/data/ratings_new.csv")
s3_client.upload_file("data/train_data.csv", bucket, f"{prefix}/data/train/train_data.csv")
s3_client.upload_file("data/val_data.csv", bucket, f"{prefix}/data/val/val_data.csv")


train_data_uri = f"s3://{bucket}/{prefix}/data/train/train_data.csv"
val_data_uri = f"s3://{bucket}/{prefix}/data/val/val_data.csv"
print(f"Saving training data to {train_data_uri}")

SageMaker Pipeline Overview


List of Steps

  1. Step 1: Data Wrangler Preprocessing Step

  2. Step 2: Create Dataset and Train/Test Split

  3. Step 3: Train XGBoost Model

  4. Step 4: Model Pre-Deployment Step

  5. Step 5: Register Model

  6. Step 6: Deploy Model

  7. Step 7: Monitor Model Deployed to SageMaker Hosted Endpoint

  8. Combine Steps and Run Pipeline

Now that you’ve manually done each step in our machine learning workflow, you can certain steps to allow for faster model experimentation without sacrificing transparncy and model tracking. In this section you will create a pipeline which trains a new model, persists the model in SageMaker and then adds the model to the registry.

Pipeline parameters

An important feature of SageMaker Pipelines is the ability to define the steps ahead of time, but be able to change the parameters to those steps at execution without having to re-define the pipeline. This can be achieved by using ParameterInteger, ParameterFloat or ParameterString to define a value upfront which can be modified when you call pipeline.start(parameters=parameters) later. Only certain parameters can be defined this way.

[ ]:
train_instance_param = ParameterString(
    name="TrainingInstance",
    default_value="ml.m4.xlarge",
)

model_approval_status = ParameterString(
    name="ModelApprovalStatus", default_value="PendingManualApproval"
)

Step 1: Data Wrangler Preprocessing Step

Update the data source in the .flow file

The 01_music_datapred.flow file is a JSON file containing instructions for where to find your data sources and how to transform the data. We’ll be updating the object telling Data Wrangler where to find the input data on S3. We will set this to your default S3 bucket. With this update to the .flow file it now points to your new S3 bucket as the data source used by SageMaker Data Wrangler.

Make sure the .flow file is closed before running this next step or it won’t update the new s3 file locations in the file

[ ]:
update_data_sources("01_music_dataprep.flow", tracks_data_source, ratings_data_source)

Upload flow to S3

This will become an input to the first step and, as such, needs to be in S3.

[ ]:
# name of the flow file which should exist in the current notebook working directory
flow_file_name = "01_music_dataprep.flow"

s3_client.upload_file(
    Filename=flow_file_name, Bucket=bucket, Key=f"{prefix}/dataprep-notebooks/music_dataprep.flow"
)
flow_s3_uri = f"s3://{bucket}/{prefix}/dataprep-notebooks/music_dataprep.flow"

print(f"Data Wrangler flow {flow_file_name} uploaded to {flow_s3_uri}")

Define Data Wrangler step’s inputs

In this step, new data from source= will be transformed according to the SageMaker Data Wrangler .flow file and later added to the existing feature groups we created in the 02 notebooks.

[ ]:
data_sources = []

## Input - S3 Source: tracks.csv
data_sources.append(
    ProcessingInput(
        source=f"s3://{bucket}/{prefix}/data/tracks_new.csv",  # You can override this to point to another dataset on S3
        destination=f"{processing_dir}/data/tracks_new.csv",
        input_name="tracks_new.csv",
        s3_data_type="S3Prefix",
        s3_input_mode="File",
        s3_data_distribution_type="FullyReplicated",
    )
)

## Input - S3 Source: ratings.csv
data_sources.append(
    ProcessingInput(
        source=f"s3://{bucket}/{prefix}/data/ratings_new.csv",  # You can override this to point to another dataset on S3
        destination=f"{processing_dir}/data/ratings_new.csv",
        input_name="ratings_new.csv",
        s3_data_type="S3Prefix",
        s3_input_mode="File",
        s3_data_distribution_type="FullyReplicated",
    )
)

## Input - Flow: 01_music_dataprep.flow
flow_input = ProcessingInput(
    source=flow_s3_uri,
    destination=f"{processing_dir}/flow",
    input_name="flow",
    s3_data_type="S3Prefix",
    s3_input_mode="File",
    s3_data_distribution_type="FullyReplicated",
)

Define outputs for the Data Wranger step

[ ]:
# Define feature group names we previously created in notebooks 02a-c
fg_name_tracks = "track-features-music-rec"
fg_name_ratings = "ratings-features-music-rec"
fg_name_user_preferences = "user-5star-track-features-music-rec"
[ ]:
flow_output_tracks = sagemaker.processing.ProcessingOutput(
    output_name=output_name_tracks,
    app_managed=True,
    feature_store_output=sagemaker.processing.FeatureStoreOutput(feature_group_name=fg_name_tracks),
)

flow_output_user_preferences = sagemaker.processing.ProcessingOutput(
    output_name=output_name_user_preferences,
    app_managed=True,
    feature_store_output=sagemaker.processing.FeatureStoreOutput(
        feature_group_name=fg_name_user_preferences
    ),
)

flow_output_ratings = sagemaker.processing.ProcessingOutput(
    output_name=output_name_ratings,
    app_managed=True,
    feature_store_output=sagemaker.processing.FeatureStoreOutput(
        feature_group_name=fg_name_ratings
    ),
)
[ ]:
# Output configuration used as processing job container arguments
output_config_tracks = {output_name_tracks: {"content_type": "CSV"}}

output_config_user_preferences = {output_name_user_preferences: {"content_type": "CSV"}}

output_config_ratings = {output_name_ratings: {"content_type": "CSV"}}

Define processor and processing step

[ ]:
from sagemaker.network import NetworkConfig
[ ]:
# Data Wrangler Container URL
# You can also find the proper container uri by exporting your Data Wrangler flow to a pipeline notebook

container_uri = sagemaker.image_uris.retrieve(framework="data-wrangler", region=region)


flow_processor = sagemaker.processing.Processor(
    role=sagemaker_role,
    image_uri=container_uri,
    instance_count=flow_instance_count,
    instance_type=flow_instance_type,
    volume_size_in_gb=30,
    network_config=NetworkConfig(enable_network_isolation=False),
    sagemaker_session=sagemaker_session,
)

flow_step_tracks = ProcessingStep(
    name="DataWranglerStepTracks",
    processor=flow_processor,
    inputs=[flow_input] + data_sources,
    outputs=[flow_output_tracks],
    job_arguments=[f"--output-config '{json.dumps(output_config_tracks)}'"],
)

flow_step_ratings = ProcessingStep(
    name="DataWranglerStepRatings",
    processor=flow_processor,
    inputs=[flow_input] + data_sources,
    outputs=[flow_output_ratings],
    job_arguments=[f"--output-config '{json.dumps(output_config_ratings)}'"],
)

flow_step_user_preferences = ProcessingStep(
    name="DataWranglerStepUserPref",
    processor=flow_processor,
    inputs=[flow_input] + data_sources,
    outputs=[flow_output_user_preferences],
    job_arguments=[f"--output-config '{json.dumps(output_config_user_preferences)}'"],
)

Step 2: Create Dataset and Train/Test Split

[ ]:
s3_client.upload_file(
    Filename="./code/create_datasets.py", Bucket=bucket, Key=f"{prefix}/code/create_datasets.py"
)
create_dataset_script_uri = f"s3://{bucket}/{prefix}/code/create_datasets.py"

create_dataset_processor = SKLearnProcessor(
    framework_version="0.23-1",
    role=sagemaker_role,
    instance_type="ml.m5.4xlarge",
    instance_count=2,
    volume_size_in_gb=100,
    base_job_name="music-rec-pipeline-split-data",
    sagemaker_session=sagemaker_session,
)

create_dataset_step = ProcessingStep(
    name="SplitData",
    processor=create_dataset_processor,
    outputs=[
        sagemaker.processing.ProcessingOutput(
            output_name="train_data", source=f"{processing_dir}/output/train"
        ),
        sagemaker.processing.ProcessingOutput(
            output_name="test_data", source=f"{processing_dir}/output/test"
        ),
    ],
    job_arguments=[
        "--feature-group-name-tracks",
        fg_name_tracks,
        "--feature-group-name-ratings",
        fg_name_ratings,
        "--feature-group-name-user-preferences",
        fg_name_user_preferences,
        "--bucket-name",
        bucket,
        "--bucket-prefix",
        prefix,
        "--region",
        region,
    ],
    code=create_dataset_script_uri,
    depends_on=[flow_step_tracks.name, flow_step_ratings.name, flow_step_user_preferences.name],
)

Step 3: Train XGBoost Model

In this step we use the ParameterString train_instance_param defined at the beginning of the pipeline.

[ ]:
hyperparameters = {
    "max_depth": "4",
    "eta": "0.2",
    "objective": "reg:squarederror",
    "num_round": "100",
}

save_interval = 5
[ ]:
xgb_estimator = Estimator(
    role=sagemaker_role,
    instance_count=2,
    instance_type="ml.m5.4xlarge",
    volume_size=60,
    image_uri=sagemaker.image_uris.retrieve("xgboost", region, "0.90-2"),
    hyperparameters=hyperparameters,
    output_path=f"s3://{bucket}/{prefix}/training_jobs",
    base_job_name="xgb-music-rec-pipeline-model",
    max_run=1800,
)
[ ]:
train_step = TrainingStep(
    name="TrainStep",
    estimator=xgb_estimator,
    inputs={
        "train": sagemaker.inputs.TrainingInput(
            s3_data=create_dataset_step.properties.ProcessingOutputConfig.Outputs[
                "train_data"
            ].S3Output.S3Uri,
            content_type="text/csv",
        ),
        "validation": sagemaker.inputs.TrainingInput(
            s3_data=create_dataset_step.properties.ProcessingOutputConfig.Outputs[
                "test_data"
            ].S3Output.S3Uri,
            content_type="text/csv",
        ),
    },
)

Step 4: Model Pre-Deployment Step

[ ]:
model = sagemaker.model.Model(
    name="music-rec-pipeline-xgboost-model",
    image_uri=train_step.properties.AlgorithmSpecification.TrainingImage,
    model_data=train_step.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=sagemaker_session,
    role=sagemaker_role,
)

inputs = sagemaker.inputs.CreateModelInput(instance_type="ml.m4.xlarge")

create_model_step = CreateModelStep(name="CreateModel", model=model, inputs=inputs)

Step 5: Register Model

In this step you will use the ParameterString model_approval_status defined at the outset of the pipeline code.

[ ]:
register_step = RegisterModel(
    name="XgboostRegisterModel",
    estimator=xgb_estimator,
    model_data=train_step.properties.ModelArtifacts.S3ModelArtifacts,
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=prefix,
    approval_status=model_approval_status,
)

Step 6: Deploy Model

[ ]:
s3_client.upload_file(
    Filename="./code/deploy_model.py", Bucket=bucket, Key=f"{prefix}/code/deploy_model.py"
)
deploy_model_script_uri = f"s3://{bucket}/{prefix}/code/deploy_model.py"
pipeline_endpoint_name = "music-rec-pipeline-endpoint"

deploy_model_processor = SKLearnProcessor(
    framework_version="0.23-1",
    role=sagemaker_role,
    instance_type="ml.m5.xlarge",
    instance_count=1,
    volume_size_in_gb=60,
    base_job_name="music-recommender-deploy-model",
    sagemaker_session=sagemaker_session,
)

deploy_step = ProcessingStep(
    name="DeployModel",
    processor=deploy_model_processor,
    job_arguments=[
        "--model-name",
        create_model_step.properties.ModelName,
        "--region",
        region,
        "--endpoint-instance-type",
        deploy_model_instance_type,
        "--endpoint-name",
        pipeline_endpoint_name,
    ],
    code=deploy_model_script_uri,
)

Step 7: Monitor Model Deployed to SageMaker Hosted Endpoint

[ ]:
s3_client.upload_file(
    Filename="./code/model_monitor.py", Bucket=bucket, Key=f"{prefix}/code/model_monitor.py"
)
model_monitor_script_uri = f"s3://{bucket}/{prefix}/code/model_monitor.py"
mon_schedule_name_base = "music-rec-pipeline-daily-monitor"


model_monitor_processor = SKLearnProcessor(
    framework_version="0.23-1",
    role=sagemaker_role,
    instance_type="ml.m5.xlarge",
    instance_count=1,
    volume_size_in_gb=60,
    base_job_name="music-rec-pipeline-model-monitor",
    sagemaker_session=sagemaker_session,
)

monitor_model_step = ProcessingStep(
    name="ModelMonitor",
    processor=model_monitor_processor,
    outputs=[
        sagemaker.processing.ProcessingOutput(
            output_name="model_baseline", source=f"{processing_dir}/output/baselineresults"
        )
    ],
    job_arguments=[
        "--baseline-data-uri",
        val_data_uri,
        "--bucket-name",
        bucket,
        "--bucket-prefix",
        prefix,
        "--endpoint",
        pipeline_endpoint_name,
        "--region",
        region,
        "--schedule-name",
        mon_schedule_name_base,
    ],
    code=model_monitor_script_uri,
    depends_on=[deploy_step.name],
)

Combine Steps and Run Pipeline

Once all of our steps are defined, we can put them together using the SageMaker Pipeline object. While we pass the steps in order so that it is easier to read, technically the order that we pass them does not matter since the pipeline DAG will parse it out properly based on any dependencies between steps. If the input of one step is the output of another step, the Pipelines understands which must come first.

[ ]:
pipeline_name = f"MusicRecommendationPipeline"
dataprep_pipeline_name = f"MusicRecommendationDataPrepPipeline"
train_deploy_pipeline_name = f"MusicRecommendationTrainDeployPipeline"

Option 1: The Entire Pipeline End to end

[ ]:
pipeline_name = f"MusicRecommendationPipeline"
[ ]:
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[train_instance_param, model_approval_status],
    steps=[
        flow_step_tracks,
        flow_step_user_preferences,
        flow_step_ratings,
        create_dataset_step,
        train_step,
        create_model_step,
        register_step,
        deploy_step,
        monitor_model_step,
    ],
)

Note: You may want to refrain from running an entire data prep and train deploy pipeline as one long pipeline and rather consider breaking the pipeline into two parts:

Data Prep Pipeline

Sometimes we may want to run a number of data prep steps and split the data, getting it ready for training and beyond. This may require multiple iterations. We can separate this process from the rest of the pipeline by including only these data prep steps in their own smaller data prep pipeline.

Train Deploy Monitor Pipeline

This allows you to have separation of concerns around the preparation of data distinct from that of training, tuning, deployment, inference and monitoring until you want to kick off a retraining only, data prep only, or the complete pipeline. With SageMaker Pipelines you have the flexibility of doing any one of these in a modular and iterative manner.

Option 2: Data Prep Pipeline

[ ]:
pipeline_dataprep = Pipeline(
    name=dataprep_pipeline_name,
    parameters=[train_instance_param, model_approval_status],
    steps=[flow_step_tracks, flow_step_user_preferences, flow_step_ratings, create_dataset_step],
)

Option 3: Train Deploy Monitor Pipeline

[ ]:
create_dataset_step_no_depend = ProcessingStep(
    name="SplitData",
    processor=create_dataset_processor,
    outputs=[
        sagemaker.processing.ProcessingOutput(
            output_name="train_data", source=f"{processing_dir}/output/train"
        ),
        sagemaker.processing.ProcessingOutput(
            output_name="test_data", source=f"{processing_dir}/output/test"
        ),
    ],
    job_arguments=[
        "--feature-group-name-tracks",
        fg_name_tracks,
        "--feature-group-name-ratings",
        fg_name_ratings,
        "--feature-group-name-user-preferences",
        fg_name_user_preferences,
        "--bucket-name",
        bucket,
        "--bucket-prefix",
        prefix,
        "--region",
        region,
    ],
    code=create_dataset_script_uri,
)
[ ]:
pipeline_train_deploy_monitor = Pipeline(
    name=train_deploy_pipeline_name,
    parameters=[train_instance_param, model_approval_status],
    steps=[
        create_dataset_step_no_depend,
        train_step,
        create_model_step,
        register_step,
        deploy_step,
        monitor_model_step,
    ],
)

Submit the pipeline definition to the SageMaker Pipeline service

Note: If an existing pipeline has the same name it will be overwritten.

Let’s choose the pipeline we want to run

[ ]:
pipeline.upsert(role_arn=sagemaker_role)
pipeline_dataprep.upsert(role_arn=sagemaker_role)
pipeline_train_deploy_monitor.upsert(role_arn=sagemaker_role)

View the entire pipeline definition

Viewing the pipeline definition will all the string variables interpolated may help debug pipeline bugs. It is commented out here due to length.

[ ]:
# json.loads(pipeline.describe()['PipelineDefinition'])

Run the pipeline

Note this will take about 1 hour to complete. You can watch the progress of the Pipeline Job on your SageMaker Studio Components panel

[ ]:
# Special pipeline parameters can be defined or changed here
parameters = {"TrainingInstance": "ml.m5.4xlarge"}

Earlier in the notebook, we defines several ProcessingStep()s and a TrainingStep() which our Pipeline() instance here will reference and kick off.

[ ]:
%%time
start_response = pipeline.start(parameters=parameters)
# start_response = pipeline_dataprep.start(parameters=parameters)
# start_response = pipeline_train_deploy_monitor.start(parameters=parameters)
start_response.wait(delay=60, max_attempts=1000)
start_response.describe()

After completion we can use Sagemaker Studio’s Components and Registries tab to see our Pipeline graph and any further error or log messages.

Clean Up


[ ]:
import demo_helpers

demo_helpers.delete_project_resources(
    sagemaker_boto_client=sagemaker_boto_client,
    sagemaker_session=sagemaker_session,
    endpoint_names=[pipeline_endpoint_name],
    pipeline_names=[pipeline_name, dataprep_pipeline_name, train_deploy_pipeline_name],
    prefix=prefix,
    delete_s3_objects=True,
    bucket_name=bucket,
)