Using @step Decorated Step with EMR Step


This notebook’s CI test result for us-west-2 is as follows. CI test results in other regions can be found at the end of the notebook.

This us-west-2 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable


This notebook illustrates how an EMR step can be run from a SageMaker Pipeline that uses the low-code interface for training and registering the machine learning model. The EMR step uses a cluster config to create an EMR cluster, performs the required job and finally closes the cluster. All the remaining steps in the pipeline are configured automatically using the step decorator.

The steps in this pipeline include: * Preprocess the UCI Heart Failure dataset with PySpark on EMR * Train an XGBoost model * Evaluate model performance * Register model

Contents

  1. Prerequisites

  2. Configuration Setup

  3. Parameters

  4. Data Preparation

  5. Model Training and Evaluation

  6. Model Registry

  7. Execute the Pipeline

  8. Cleanup

Prerequisites

To run this notebook you will need:

EMR roles

You will have to create following roles in order to make EMR work: * Service role for Amazon EMR (EMR role) - this is passed as the ServiceRole parameter * Service role for cluster EC2 instances (EC2 instance profile) - this is passed as the JobFlowRole parameter

See ‘EMR IAM roles’ for more details.

IAM policy to enable the notebook to run a step on an Amazon EMR cluster

The notebook execution role should have policies which enable the notebook to run a step on an Amazon EMR cluster. The Amazon managed policy AmazonSageMakerPipelinesIntegrations should be added to the notebook execution role.

Setup

[ ]:
!pip install pip --upgrade --quiet
!pip install boto --quiet
[ ]:
!pip install -r ./requirements.txt --quiet
[ ]:
import os

# Set path to config file
os.environ["SAGEMAKER_USER_CONFIG_OVERRIDE"] = os.getcwd()

Configuration Setup

Let’s now configure the session and the client for Amazon SageMaker, and all necessary configurations for the pipelines (e.g., input ad output bucket).

[ ]:
import boto3
import sagemaker

sagemaker_session = sagemaker.Session()
role = sagemaker.get_execution_role()
region = sagemaker_session.boto_region_name
boto_session = boto3.Session(region_name=region)
sagemaker_client = sagemaker_session.sagemaker_client
default_bucket = sagemaker_session.default_bucket()

account = boto_session.client("sts").get_caller_identity()["Account"]

Parameters

These are parameters that will be passed to the SageMaker pipeline when it executes.

[ ]:
model_package_group_name = "HeartFailurePackageGroup"
pipeline_name = "EMRStepPipeline"
base_job_prefix = "emr-step-pipeline"
processing_instance_type = "ml.m5.xlarge"
training_instance_type = "ml.m5.xlarge"
BASE_DIR = "code"

# IAM roles configured in the Prerequisites section
job_flow_role = f"arn:aws:iam::{account}:instance-profile/EMR_EC2_DefaultRole"
service_role = f"arn:aws:iam::{account}:role/EMR_DefaultRole_V2"
[ ]:
# Define variables and parameters needed for the Pipeline steps
# parameters for pipeline execution
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
)

processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1)
model_approval_status = ParameterString(
    name="ModelApprovalStatus", default_value="PendingManualApproval"
)
input_data = ParameterString(
    name="InputDataUrl",
    default_value=f"s3://sagemaker-example-files-prod-{region}/datasets/tabular/uci_heart_failure/heart_failure_clinical_records_dataset.csv",
)
instance_type_param = ParameterString(name="TrainingInstanceType", default_value="ml.m5.xlarge")

# Output path for the data preparation step to run on Amazon EMR
output_path = f"s3://{default_bucket}/{base_job_prefix}/prep"

Data Preparation

A PySpark job on EMR is used to prepare the for the training job. Using the script preprocess.py, the dataset is featurized and split into train, test, and validation datasets. The output of this step is used as the input to the train_model function.

This notebook uses the UCI Heart Failure Clinical Records Dataset [1]. The objective in this notebook is to predict the survival of patients with heart failure, which is a binary classification problem.

[1] Chicco, D., & Jurman, G. (2020). Machine learning can predict survival of patients with heart failure from serum creatinine and ejection fraction alone. BMC medical informatics and decision making, 20(1), 1-16.

[ ]:
%mkdir code
[ ]:
%%writefile code/preprocess.py

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.ml.functions import vector_to_array
from pyspark.ml import Pipeline
from argparse import ArgumentParser, Namespace


def process(args: Namespace):
    print("Starting Spark session")
    spark = SparkSession.builder.appName("preprocess").getOrCreate()
    spark.conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

    print("Reading source data")
    df = spark.read.csv(args.input, header=True, inferSchema=True)

    from pyspark.ml.feature import (
        StandardScaler,
        VectorAssembler,
    )

    features = [
        "platelets",
        "serum_creatinine",
        "age",
        "anaemia",
        "creatinine_phosphokinase",
        "diabetes",
        "ejection_fraction",
        "high_blood_pressure",
        "serum_sodium",
        "sex",
        "smoking",
        "time",
    ]
    print("Performing feature engineering")
    pipeline = Pipeline(
        stages=[
            VectorAssembler(inputCols=features, outputCol="vector", handleInvalid="skip"),
            StandardScaler(inputCol="vector", outputCol="features"),
        ]
    )
    print("Fitting transformers")
    model = pipeline.fit(df)
    print("Transforming source data")
    df_out = (
        model.transform(df)
        .select(
            "DEATH_EVENT",
            vector_to_array(F.col("features")).alias("features"),
        )
        .select([F.col("DEATH_EVENT")] + [F.col("features")[idx] for idx in range(len(features))])
    )

    # Shuffle, split train/test/valid, and write out to csv files without headers
    print("Writing train/valid/test splits")
    train, valid, test = df_out.orderBy(F.rand()).randomSplit([0.7, 0.15, 0.15])
    prefix = args.output
    train.repartition(1).write.mode("overwrite").csv(f"{prefix}/train")
    valid.repartition(1).write.mode("overwrite").csv(f"{prefix}/valid")
    test.repartition(1).write.mode("overwrite").csv(f"{prefix}/test")

    spark.stop()

    return


if __name__ == "__main__":
    parser = ArgumentParser()
    parser.add_argument("--input")
    parser.add_argument("--output")
    args, _ = parser.parse_known_args()
    process(args)
[ ]:
script = sagemaker_session.upload_data("code/preprocess.py", key_prefix=f"{base_job_prefix}/app")

Next we configure the EMRStep using SageMaker Pipelines.

[ ]:
# Process the training data step using a PySpark script.
# Split the training data set into train, test, and validation datasets
# Run as a step as a job flow on EMR

from sagemaker.workflow.emr_step import EMRStep, EMRStepConfig

emr_config = EMRStepConfig(
    jar="command-runner.jar",
    args=[
        "spark-submit",
        "--deploy-mode",
        "cluster",
        script,
        "--input",
        input_data,
        "--output",
        output_path,
    ],
)

step_emr = EMRStep(
    name="HeartFailureEMR",
    cluster_id=None,
    step_config=emr_config,
    display_name="Preprocess",
    description="Preprocess data for XGBoost",
    cluster_config={
        "Applications": [
            {
                "Name": "Spark",
            }
        ],
        "Instances": {
            "InstanceGroups": [
                {"InstanceRole": "MASTER", "InstanceCount": 1, "InstanceType": "m5.2xlarge"},
                {"InstanceRole": "CORE", "InstanceCount": 2, "InstanceType": "m5.2xlarge"},
            ]
        },
        "BootstrapActions": [],
        "ReleaseLabel": "emr-6.6.0",
        "JobFlowRole": job_flow_role,
        "ServiceRole": service_role,
    },
)

In this step, the training and validation data from the previous step are taken to train a model via XGBoost.

[ ]:
import pandas as pd
import boto3
from io import StringIO

from sagemaker.workflow.function_step import step

from xgboost import XGBClassifier


def read_df_from_s3(bucket: str, prefix: str) -> pd.DataFrame:
    s3 = boto3.resource("s3")
    bucket = s3.Bucket(bucket)
    prefix_objs = bucket.objects.filter(Prefix=prefix)
    prefix_df = []
    for obj in prefix_objs:
        body = obj.get()["Body"].read().decode("utf-8")
        df = pd.read_csv(StringIO(body), header=None)
        prefix_df.append(df)

    return pd.concat(prefix_df)


@step(
    name="HeartFailureTrain",
    instance_type=instance_type_param,
)
def train_model(
    num_round: int = 50,
    objective: str = "binary:logistic",
    max_depth: int = 5,
    eta: float = 0.2,
    gamma: int = 4,
    min_child_weight: int = 6,
    subsample: float = 0.7,
    use_gpu: bool = False,
):
    # Load and process training data
    train_prefix = f"{base_job_prefix}/prep/train"
    train_df = read_df_from_s3(default_bucket, train_prefix)
    y_train = train_df.iloc[:, 0].to_numpy()
    train_df.drop(train_df.columns[0], axis=1, inplace=True)
    x_train = train_df.to_numpy()
    # Load and process validation data
    validation_prefix = f"{base_job_prefix}/prep/valid"
    validation_df = read_df_from_s3(default_bucket, validation_prefix)
    y_validation = validation_df.iloc[:, 0].to_numpy()
    validation_df.drop(validation_df.columns[0], axis=1, inplace=True)
    x_validation = validation_df.to_numpy()

    param = {
        "objective": objective,
        "max_depth": max_depth,
        "eta": eta,
        "gamma": gamma,
        "min_child_weight": min_child_weight,
        "subsample": subsample,
        "tree_method": "gpu_hist" if use_gpu else "hist",  # Use GPU accelerated algorithm
    }

    xgb = XGBClassifier(n_estimators=num_round, **param)
    xgb.fit(
        x_train,
        y_train,
        eval_set=[(x_validation, y_validation)],
        early_stopping_rounds=5,
    )

    return xgb

In this step, the trained model is evaluated as per its accuracy.

[ ]:
import numpy as np

from sklearn.metrics import accuracy_score


@step(name="HeartFailureEval")
def evaluate_model(model) -> dict:
    test_prefix = f"{base_job_prefix}/prep/test"
    test_df = read_df_from_s3(default_bucket, test_prefix)
    y_test = test_df.iloc[:, 0].to_numpy().astype(int)
    test_df.drop(test_df.columns[0], axis=1, inplace=True)
    x_test = test_df.to_numpy()

    predictions = (model.predict_proba(x_test)[:, 1] > 0.5).astype(int)

    accuracy = accuracy_score(y_test, predictions)

    report_dict = {
        "regression_metrics": {
            "accuracy": {"value": accuracy},
        },
    }
    print(f"evaluation report: {report_dict}")

    return report_dict

In this step, we register the trained model to Model Registry using ModelBuilder to build model artifacts for inference.

[ ]:
import json
import s3fs
from sagemaker.serve.builder.model_builder import ModelBuilder
from sagemaker.serve.builder.schema_builder import SchemaBuilder
from sagemaker.workflow.parameters import ParameterString
from sagemaker import ModelMetrics, MetricsSource
from sagemaker.s3_utils import s3_path_join
from sagemaker.utils import unique_name_from_base


@step(name="HeartFailureRegisterModel")
def register_model(
    model,
    evaluation: dict,
    model_approval_status: ParameterString,
):
    # Upload evaluation report to S3
    eval_file_name = unique_name_from_base("evaluation")
    eval_report_s3_uri = s3_path_join(
        "s3://",
        default_bucket,
        model_package_group_name,
        f"evaluation-report/{eval_file_name}.json",
    )
    s3_fs = s3fs.S3FileSystem()
    eval_report_str = json.dumps(evaluation)
    with s3_fs.open(eval_report_s3_uri, "wb") as file:
        file.write(eval_report_str.encode("utf-8"))

    # Create model_metrics as per evaluation report in s3
    model_metrics = ModelMetrics(
        model_statistics=MetricsSource(
            s3_uri=eval_report_s3_uri,
            content_type="application/json",
        )
    )

    # Model schema definitions
    # Note: The sample_input/sample_output only references the data type
    schema_builder = SchemaBuilder(
        sample_input=np.array([0]),
        sample_output=np.array([0]),
    )

    # Build the trained model and register it
    model_builder = ModelBuilder(
        model=model,
        schema_builder=schema_builder,
        role_arn=role,
        s3_model_data_url=s3_path_join(
            "s3://", default_bucket, model_package_group_name, "model-artifacts"
        ),
    )

    model_package = model_builder.build().register(
        model_package_group_name=model_package_group_name,
        approval_status=model_approval_status,
        model_metrics=model_metrics,
    )

    print(f"Registered Model Package ARN: {model_package.model_package_arn}")
    return model_package.model_package_arn

Execute the Pipeline

Now we are going to define a Pipeline that will combine the various steps defined in this notebook. Note that to indicate that train_model depends on step_emr, we have to call the add_depends_on function.

[ ]:
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.step_outputs import get_step

# Causal logic for the execution pipeline
delayed_model = train_model()
get_step(delayed_model).add_depends_on([step_emr])
delayed_evaluation = evaluate_model(model=delayed_model)
get_step(delayed_evaluation).add_depends_on([step_emr])

delayed_register = register_model(
    model=delayed_model,
    evaluation=delayed_evaluation,
    model_approval_status=model_approval_status,
)

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        model_approval_status,
        input_data,
        instance_type_param,
    ],
    steps=[delayed_register],
)
[ ]:
pipeline.upsert(role_arn=role)
[ ]:
execution = pipeline.start()
[ ]:
execution.describe()
[ ]:
execution.wait()
[ ]:
execution.list_steps()

Cleanup

Running the following cell will delete the following resources created in this notebook

[ ]:
# Delete the Pipeline
sagemaker_client.delete_pipeline(PipelineName=pipeline_name)

Notebook CI Test Results

This notebook was tested in multiple regions. The test results are as follows, except for us-west-2 which is shown at the top of the notebook.

This us-east-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This us-east-2 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This us-west-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This ca-central-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This sa-east-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This eu-west-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This eu-west-2 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This eu-west-3 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This eu-central-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This eu-north-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This ap-southeast-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This ap-southeast-2 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This ap-northeast-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This ap-northeast-2 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This ap-south-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

[ ]: