[1]:
# Parameters
kms_key = "arn:aws:kms:us-west-2:000000000000:1234abcd-12ab-34cd-56ef-1234567890ab"

SageMaker Pipelines Lambda Step

This notebook illustrates how a Lambda function can be run as a step in a SageMaker Pipeline.

The steps in this pipeline include: * Preprocess the Abalone dataset * Train an XGBoost Model * Evaluate the model performance * Create a model * Deploy the model to a SageMaker Hosted Endpoint using a Lambda Function, through SageMaker Pipelines

A step to register the model into a Model Registry can be added to the pipeline using the RegisterModel step.

Runtime

This notebook takes approximately 15 minutes to run.

Contents

  1. Prerequisites

  2. Configuration Setup

  3. Data Preparation

  4. Model Training and Evaluation

  5. Setting up Lambda

  6. Execute the Pipeline

  7. Clean up resources

Prerequisites

The notebook execution role should have policies which enable the notebook to create a Lambda function. The Amazon managed policy AmazonSageMakerPipelinesIntegrations can be added to the notebook execution role to achieve the same effect.

The policy description is as follows:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "lambda:CreateFunction",
                "lambda:DeleteFunction",
                "lambda:InvokeFunction",
                "lambda:UpdateFunctionCode"
            ],
            "Resource": [
                "arn:aws:lambda:*:*:function:*sagemaker*",
                "arn:aws:lambda:*:*:function:*sageMaker*",
                "arn:aws:lambda:*:*:function:*SageMaker*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "sqs:CreateQueue",
                "sqs:SendMessage"
            ],
            "Resource": [
                "arn:aws:sqs:*:*:*sagemaker*",
                "arn:aws:sqs:*:*:*sageMaker*",
                "arn:aws:sqs:*:*:*SageMaker*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "iam:PassRole"
            ],
            "Resource": "arn:aws:iam::*:role/*",
            "Condition": {
                "StringEquals": {
                    "iam:PassedToService": [
                        "lambda.amazonaws.com"
                    ]
                }
            }
        }
    ]
}

Let’s start by importing necessary packages and installing the SageMaker Python SDK.

[2]:
import os
import time
import boto3
import sagemaker

from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput

from sagemaker.processing import (
    ProcessingInput,
    ProcessingOutput,
    Processor,
    ScriptProcessor,
)

from sagemaker import Model
from sagemaker.xgboost import XGBoostPredictor
from sagemaker.sklearn.processing import SKLearnProcessor

from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
)
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.properties import PropertyFile
from sagemaker.workflow.steps import ProcessingStep, TrainingStep, CacheConfig
from sagemaker.workflow.lambda_step import (
    LambdaStep,
    LambdaOutput,
    LambdaOutputTypeEnum,
)
from sagemaker.workflow.model_step import ModelStep
from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet
from sagemaker.workflow.pipeline_context import PipelineSession

from sagemaker.lambda_helper import Lambda
import sys
[3]:
!{sys.executable} -m pip install "sagemaker>=2.99.0"
/opt/conda/lib/python3.7/site-packages/secretstorage/dhcrypto.py:16: CryptographyDeprecationWarning: int_from_bytes is deprecated, use int.from_bytes instead
  from cryptography.utils import int_from_bytes
/opt/conda/lib/python3.7/site-packages/secretstorage/util.py:25: CryptographyDeprecationWarning: int_from_bytes is deprecated, use int.from_bytes instead
  from cryptography.utils import int_from_bytes
Requirement already satisfied: sagemaker>=2.99.0 in /opt/conda/lib/python3.7/site-packages (2.99.0)
Requirement already satisfied: protobuf3-to-dict<1.0,>=0.1.5 in /opt/conda/lib/python3.7/site-packages (from sagemaker>=2.99.0) (0.1.5)
Requirement already satisfied: google-pasta in /opt/conda/lib/python3.7/site-packages (from sagemaker>=2.99.0) (0.2.0)
Requirement already satisfied: smdebug-rulesconfig==1.0.1 in /opt/conda/lib/python3.7/site-packages (from sagemaker>=2.99.0) (1.0.1)
Requirement already satisfied: numpy<2.0,>=1.9.0 in /opt/conda/lib/python3.7/site-packages (from sagemaker>=2.99.0) (1.21.1)
Requirement already satisfied: boto3<2.0,>=1.20.21 in /opt/conda/lib/python3.7/site-packages (from sagemaker>=2.99.0) (1.20.47)
Requirement already satisfied: attrs<22,>=20.3.0 in /opt/conda/lib/python3.7/site-packages (from sagemaker>=2.99.0) (21.4.0)
Requirement already satisfied: pandas in /opt/conda/lib/python3.7/site-packages (from sagemaker>=2.99.0) (1.0.1)
Requirement already satisfied: importlib-metadata<5.0,>=1.4.0 in /opt/conda/lib/python3.7/site-packages (from sagemaker>=2.99.0) (1.5.0)
Requirement already satisfied: protobuf<4.0,>=3.1 in /opt/conda/lib/python3.7/site-packages (from sagemaker>=2.99.0) (3.17.3)
Requirement already satisfied: pathos in /opt/conda/lib/python3.7/site-packages (from sagemaker>=2.99.0) (0.2.8)
Requirement already satisfied: packaging>=20.0 in /opt/conda/lib/python3.7/site-packages (from sagemaker>=2.99.0) (20.1)
Requirement already satisfied: botocore<1.24.0,>=1.23.47 in /opt/conda/lib/python3.7/site-packages (from boto3<2.0,>=1.20.21->sagemaker>=2.99.0) (1.23.47)
Requirement already satisfied: s3transfer<0.6.0,>=0.5.0 in /opt/conda/lib/python3.7/site-packages (from boto3<2.0,>=1.20.21->sagemaker>=2.99.0) (0.5.0)
Requirement already satisfied: jmespath<1.0.0,>=0.7.1 in /opt/conda/lib/python3.7/site-packages (from boto3<2.0,>=1.20.21->sagemaker>=2.99.0) (0.10.0)
Requirement already satisfied: urllib3<1.27,>=1.25.4 in /opt/conda/lib/python3.7/site-packages (from botocore<1.24.0,>=1.23.47->boto3<2.0,>=1.20.21->sagemaker>=2.99.0) (1.26.6)
Requirement already satisfied: python-dateutil<3.0.0,>=2.1 in /opt/conda/lib/python3.7/site-packages (from botocore<1.24.0,>=1.23.47->boto3<2.0,>=1.20.21->sagemaker>=2.99.0) (2.8.1)
Requirement already satisfied: zipp>=0.5 in /opt/conda/lib/python3.7/site-packages (from importlib-metadata<5.0,>=1.4.0->sagemaker>=2.99.0) (2.2.0)
Requirement already satisfied: six in /opt/conda/lib/python3.7/site-packages (from packaging>=20.0->sagemaker>=2.99.0) (1.14.0)
Requirement already satisfied: pyparsing>=2.0.2 in /opt/conda/lib/python3.7/site-packages (from packaging>=20.0->sagemaker>=2.99.0) (2.4.6)
Requirement already satisfied: pytz>=2017.2 in /opt/conda/lib/python3.7/site-packages (from pandas->sagemaker>=2.99.0) (2019.3)
Requirement already satisfied: ppft>=1.6.6.4 in /opt/conda/lib/python3.7/site-packages (from pathos->sagemaker>=2.99.0) (1.6.6.4)
Requirement already satisfied: pox>=0.3.0 in /opt/conda/lib/python3.7/site-packages (from pathos->sagemaker>=2.99.0) (0.3.0)
Requirement already satisfied: multiprocess>=0.70.12 in /opt/conda/lib/python3.7/site-packages (from pathos->sagemaker>=2.99.0) (0.70.12.2)
Requirement already satisfied: dill>=0.3.4 in /opt/conda/lib/python3.7/site-packages (from pathos->sagemaker>=2.99.0) (0.3.4)
WARNING: Running pip as the 'root' user can result in broken permissions and conflicting behaviour with the system package manager. It is recommended to use a virtual environment instead: https://pip.pypa.io/warnings/venv
WARNING: You are using pip version 21.1.3; however, version 22.1.2 is available.
You should consider upgrading via the '/opt/conda/bin/python -m pip install --upgrade pip' command.

Configuration Setup

Let’s now configure the setup we need, which includes the session object from the SageMaker Python SDK, and neccessary configurations for the pipelines, such as object types, input and output buckets and so on.

[4]:
# Create the SageMaker Session

sagemaker_session = sagemaker.Session()
pipeline_session = PipelineSession()
sm_client = sagemaker_session.sagemaker_client
region = sagemaker_session.boto_region_name
prefix = "lambda-step-pipeline"

account_id = sagemaker_session.account_id()
[5]:
# Define variables and parameters needed for the Pipeline steps

role = sagemaker.get_execution_role()
default_bucket = sagemaker_session.default_bucket()
base_job_prefix = "lambda-step-example"
s3_prefix = "lambda-step-pipeline"

processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1)
training_instance_type = ParameterString(name="TrainingInstanceType", default_value="ml.m5.xlarge")
model_approval_status = ParameterString(
    name="ModelApprovalStatus", default_value="PendingManualApproval"
)
input_data = ParameterString(
    name="InputDataUrl",
    default_value=f"s3://sagemaker-sample-files/datasets/tabular/uci_abalone/abalone.csv",
)
model_approval_status = ParameterString(
    name="ModelApprovalStatus", default_value="PendingManualApproval"
)

# Cache Pipeline steps to reduce execution time on subsequent executions
cache_config = CacheConfig(enable_caching=True, expire_after="30d")

Data Preparation

An SKLearn processor is used to prepare the dataset for the Hyperparameter Tuning job. Using the script preprocess.py, the dataset is featurized and split into train, test, and validation datasets.

The output of this step is used as the input to the TrainingStep.

[6]:
!mkdir -p code
[7]:
%%writefile code/preprocess.py

"""Feature engineers the abalone dataset."""
import argparse
import logging
import os
import pathlib
import requests
import tempfile

import boto3
import numpy as np
import pandas as pd

from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder

logger = logging.getLogger()
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler())


# Since we get a headerless CSV file we specify the column names here.
feature_columns_names = [
    "sex",
    "length",
    "diameter",
    "height",
    "whole_weight",
    "shucked_weight",
    "viscera_weight",
    "shell_weight",
]
label_column = "rings"

feature_columns_dtype = {
    "sex": str,
    "length": np.float64,
    "diameter": np.float64,
    "height": np.float64,
    "whole_weight": np.float64,
    "shucked_weight": np.float64,
    "viscera_weight": np.float64,
    "shell_weight": np.float64,
}
label_column_dtype = {"rings": np.float64}


def merge_two_dicts(x, y):
    """Merges two dicts, returning a new copy."""
    z = x.copy()
    z.update(y)
    return z


if __name__ == "__main__":
    logger.debug("Starting preprocessing.")
    parser = argparse.ArgumentParser()
    parser.add_argument("--input-data", type=str, required=True)
    args = parser.parse_args()

    base_dir = "/opt/ml/processing"
    pathlib.Path(f"{base_dir}/data").mkdir(parents=True, exist_ok=True)
    input_data = args.input_data
    bucket = input_data.split("/")[2]
    key = "/".join(input_data.split("/")[3:])

    logger.info("Downloading data from bucket: %s, key: %s", bucket, key)
    fn = f"{base_dir}/data/abalone-dataset.csv"
    s3 = boto3.resource("s3")
    s3.Bucket(bucket).download_file(key, fn)

    logger.debug("Reading downloaded data.")
    df = pd.read_csv(
        fn,
        header=None,
        names=feature_columns_names + [label_column],
        dtype=merge_two_dicts(feature_columns_dtype, label_column_dtype),
    )
    os.unlink(fn)

    logger.debug("Defining transformers.")
    numeric_features = list(feature_columns_names)
    numeric_features.remove("sex")
    numeric_transformer = Pipeline(
        steps=[
            ("imputer", SimpleImputer(strategy="median")),
            ("scaler", StandardScaler()),
        ]
    )

    categorical_features = ["sex"]
    categorical_transformer = Pipeline(
        steps=[
            ("imputer", SimpleImputer(strategy="constant", fill_value="missing")),
            ("onehot", OneHotEncoder(handle_unknown="ignore")),
        ]
    )

    preprocess = ColumnTransformer(
        transformers=[
            ("num", numeric_transformer, numeric_features),
            ("cat", categorical_transformer, categorical_features),
        ]
    )

    logger.info("Applying transforms.")
    y = df.pop("rings")
    X_pre = preprocess.fit_transform(df)
    y_pre = y.to_numpy().reshape(len(y), 1)

    X = np.concatenate((y_pre, X_pre), axis=1)

    logger.info("Splitting %d rows of data into train, validation, test datasets.", len(X))
    np.random.shuffle(X)
    train, validation, test = np.split(X, [int(0.7 * len(X)), int(0.85 * len(X))])

    logger.info("Writing out datasets to %s.", base_dir)
    pd.DataFrame(train).to_csv(f"{base_dir}/train/train.csv", header=False, index=False)
    pd.DataFrame(validation).to_csv(
        f"{base_dir}/validation/validation.csv", header=False, index=False
    )
    pd.DataFrame(test).to_csv(f"{base_dir}/test/test.csv", header=False, index=False)
Writing code/preprocess.py
[8]:
# Process the training data step using a python script.
# Split the training data set into train, test, and validation datasets

sklearn_processor = SKLearnProcessor(
    framework_version="0.23-1",
    instance_type="ml.m5.xlarge",
    instance_count=processing_instance_count,
    base_job_name=f"{base_job_prefix}/sklearn-abalone-preprocess",
    sagemaker_session=pipeline_session,
    role=role,
)

processor_args = sklearn_processor.run(
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test"),
    ],
    code="code/preprocess.py",
    arguments=["--input-data", input_data],
)

step_process = ProcessingStep(
    name="PreprocessAbaloneData",
    step_args=processor_args,
    cache_config=cache_config,
)
/opt/conda/lib/python3.7/site-packages/sagemaker/workflow/pipeline_context.py:197: UserWarning: Running within a PipelineSession, there will be No Wait, No Logs, and No Job being started.
  UserWarning,

Job Name:  lambda-step-example/sklearn-abalone-pre-2022-07-13-15-58-56-005
Inputs:  [{'InputName': 'code', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-us-west-2-000000000000/lambda-step-example/sklearn-abalone-pre-2022-07-13-15-58-56-005/input/code/preprocess.py', 'LocalPath': '/opt/ml/processing/input/code', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}]
Outputs:  [{'OutputName': 'train', 'AppManaged': False, 'S3Output': {'S3Uri': 's3://sagemaker-us-west-2-000000000000/lambda-step-example/sklearn-abalone-pre-2022-07-13-15-58-56-005/output/train', 'LocalPath': '/opt/ml/processing/train', 'S3UploadMode': 'EndOfJob'}}, {'OutputName': 'validation', 'AppManaged': False, 'S3Output': {'S3Uri': 's3://sagemaker-us-west-2-000000000000/lambda-step-example/sklearn-abalone-pre-2022-07-13-15-58-56-005/output/validation', 'LocalPath': '/opt/ml/processing/validation', 'S3UploadMode': 'EndOfJob'}}, {'OutputName': 'test', 'AppManaged': False, 'S3Output': {'S3Uri': 's3://sagemaker-us-west-2-000000000000/lambda-step-example/sklearn-abalone-pre-2022-07-13-15-58-56-005/output/test', 'LocalPath': '/opt/ml/processing/test', 'S3UploadMode': 'EndOfJob'}}]

Model Training and Evaluation

We will now train an XGBoost model using the SageMaker Python SDK and the output of the ProcessingStep.

Training the Model

[9]:
# Define the output path for the model artifacts from the Hyperparameter Tuning Job
model_path = f"s3://{default_bucket}/{base_job_prefix}/AbaloneTrain"

image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.0-1",
    py_version="py3",
    instance_type="ml.m5.xlarge",
)

xgb_train = Estimator(
    image_uri=image_uri,
    instance_type=training_instance_type,
    instance_count=1,
    output_path=model_path,
    base_job_name=f"{prefix}/{base_job_prefix}/sklearn-abalone-preprocess",
    sagemaker_session=pipeline_session,
    role=role,
)

xgb_train.set_hyperparameters(
    objective="reg:linear",
    num_round=50,
    max_depth=5,
    eta=0.2,
    gamma=4,
    min_child_weight=6,
    subsample=0.7,
    silent=0,
)

train_args = xgb_train.fit(
    inputs={
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
            content_type="text/csv",
        ),
        "validation": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "validation"
            ].S3Output.S3Uri,
            content_type="text/csv",
        ),
    },
)

step_train = TrainingStep(
    name="TrainAbaloneModel",
    step_args=train_args,
    cache_config=cache_config,
)
/opt/conda/lib/python3.7/site-packages/sagemaker/workflow/steps.py:391: UserWarning: Profiling is enabled on the provided estimator. The default profiler rule includes a timestamp which will change each time the pipeline is upserted, causing cache misses. If profiling is not needed, set disable_profiler to True on the estimator.
  warnings.warn(msg)

Evaluating the model

Use a processing job to evaluate the model from the TrainingStep. If the output of the evaluation is True, a model is created and a Lambda function is invoked to deploy the model to a SageMaker Endpoint.

[10]:
%%writefile code/evaluate.py

"""Evaluation script for measuring mean squared error."""
import json
import logging
import pathlib
import pickle
import tarfile

import numpy as np
import pandas as pd
import xgboost

from sklearn.metrics import mean_squared_error

logger = logging.getLogger()
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler())


if __name__ == "__main__":
    logger.debug("Starting evaluation.")
    model_path = "/opt/ml/processing/model/model.tar.gz"
    with tarfile.open(model_path) as tar:
        tar.extractall(path=".")

    logger.debug("Loading xgboost model.")
    model = pickle.load(open("xgboost-model", "rb"))

    logger.debug("Reading test data.")
    test_path = "/opt/ml/processing/test/test.csv"
    df = pd.read_csv(test_path, header=None)

    logger.debug("Reading test data.")
    y_test = df.iloc[:, 0].to_numpy()
    df.drop(df.columns[0], axis=1, inplace=True)
    X_test = xgboost.DMatrix(df.values)

    logger.info("Performing predictions against test data.")
    predictions = model.predict(X_test)

    logger.debug("Calculating mean squared error.")
    mse = mean_squared_error(y_test, predictions)
    std = np.std(y_test - predictions)
    report_dict = {
        "regression_metrics": {
            "mse": {"value": mse, "standard_deviation": std},
        },
    }

    output_dir = "/opt/ml/processing/evaluation"
    pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)

    logger.info("Writing out evaluation report with mse: %f", mse)
    evaluation_path = f"{output_dir}/evaluation.json"
    with open(evaluation_path, "w") as f:
        f.write(json.dumps(report_dict))
Writing code/evaluate.py
[11]:
# A ProcessingStep is used to evaluate the performance of the trained model.
# Based on the results of the evaluation, the model is created and deployed.

script_eval = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type="ml.m5.xlarge",
    instance_count=1,
    base_job_name=f"{prefix}/{base_job_prefix}/sklearn-abalone-preprocess",
    sagemaker_session=pipeline_session,
    role=role,
)

evaluation_report = PropertyFile(
    name="AbaloneEvaluationReport",
    output_name="evaluation",
    path="evaluation.json",
)

eval_args = script_eval.run(
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
            destination="/opt/ml/processing/test",
        ),
    ],
    outputs=[
        ProcessingOutput(
            output_name="evaluation",
            source="/opt/ml/processing/evaluation",
            destination=f"s3://{default_bucket}/{s3_prefix}/evaluation_report",
        ),
    ],
    code="code/evaluate.py",
)
step_eval = ProcessingStep(
    name="EvaluateAbaloneModel",
    step_args=eval_args,
    property_files=[evaluation_report],
    cache_config=cache_config,
)

Job Name:  lambda-step-pipeline/lambda-step-exampl-2022-07-13-15-58-56-545
Inputs:  [{'InputName': 'input-1', 'AppManaged': False, 'S3Input': {'S3Uri': <sagemaker.workflow.properties.Properties object at 0x7f15b0ee9350>, 'LocalPath': '/opt/ml/processing/model', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}, {'InputName': 'input-2', 'AppManaged': False, 'S3Input': {'S3Uri': <sagemaker.workflow.properties.Properties object at 0x7f15b0f199d0>, 'LocalPath': '/opt/ml/processing/test', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}, {'InputName': 'code', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-us-west-2-000000000000/lambda-step-pipeline/lambda-step-exampl-2022-07-13-15-58-56-545/input/code/evaluate.py', 'LocalPath': '/opt/ml/processing/input/code', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}]
Outputs:  [{'OutputName': 'evaluation', 'AppManaged': False, 'S3Output': {'S3Uri': 's3://sagemaker-us-west-2-000000000000/lambda-step-pipeline/evaluation_report', 'LocalPath': '/opt/ml/processing/evaluation', 'S3UploadMode': 'EndOfJob'}}]

Creating the final model object

The model is created and the name of the model is provided to the Lambda function for deployment. The CreateModelStep dynamically assigns a name to the model.

[12]:
# Create Model
model = Model(
    image_uri=image_uri,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role,
    predictor_cls=XGBoostPredictor,
)

step_create_model = ModelStep(
    name="CreateModel",
    step_args=model.create("ml.m4.large"),
)

Setting up Lambda

When defining the LambdaStep, the SageMaker Lambda helper class provides helper functions for creating the Lambda function. Users can either use the lambda_func argument to provide the function ARN to an already deployed Lambda function OR use the Lambda class to create a Lambda function by providing a script, function name and role for the Lambda function.

When passing inputs to the Lambda, the inputs argument can be used and within the Lambda function’s handler, the event argument can be used to retrieve the inputs.

The dictionary response from the Lambda function is parsed through the LambdaOutput objects provided to the outputs argument. The output_name in LambdaOutput corresponds to the dictionary key in the Lambda’s return dictionary.

Define the Lambda function

Users can choose the leverage the Lambda helper class to create a Lambda function and provide that function object to the LambdaStep. Alternatively, users can use a pre-deployed Lambda function and provide the function ARN to the Lambda helper class in the Lambda step.

[13]:
%%writefile code/lambda_helper.py

"""
This Lambda function creates an Endpoint Configuration and deploys a model to an Endpoint.
The name of the model to deploy is provided via the `event` argument
"""

import json
import boto3


def lambda_handler(event, context):
    """ """
    sm_client = boto3.client("sagemaker")

    # The name of the model created in the Pipeline CreateModelStep
    model_name = event["model_name"]

    endpoint_config_name = event["endpoint_config_name"]
    endpoint_name = event["endpoint_name"]

    create_endpoint_config_response = sm_client.create_endpoint_config(
        EndpointConfigName=endpoint_config_name,
        ProductionVariants=[
            {
                "InstanceType": "ml.m4.xlarge",
                "InitialVariantWeight": 1,
                "InitialInstanceCount": 1,
                "ModelName": model_name,
                "VariantName": "AllTraffic",
            }
        ],
    )

    create_endpoint_response = sm_client.create_endpoint(
        EndpointName=endpoint_name, EndpointConfigName=endpoint_config_name
    )

    return {
        "statusCode": 200,
        "body": json.dumps("Created Endpoint!"),
        "other_key": "example_value",
    }
Writing code/lambda_helper.py

Setting up the custom IAM Role

The Lambda function needs an IAM role that allows it to deploy a SageMaker Endpoint. The role ARN must be provided in the LambdaStep.

The Lambda role should at minimum have policies to allow sagemaker:CreateModel, sagemaker:CreateEndpointConfig, sagemaker:CreateEndpoint in addition to the based Lambda execution policies.

A helper function in iam_helper.py is available to create the Lambda function role. Please note that the role uses the Amazon managed policy - SageMakerFullAccess. This should be replaced with an IAM policy with least privileges as per AWS IAM best practices.

[14]:
from iam_helper import create_lambda_role

lambda_role = create_lambda_role("lambda-deployment-role")
Using ARN from existing role: lambda-deployment-role
[15]:
# Custom Lambda Step

current_time = time.strftime("%m-%d-%H-%M-%S", time.localtime())
model_name = "demo-lambda-model" + current_time
endpoint_config_name = "demo-lambda-deploy-endpoint-config-" + current_time
endpoint_name = "demo-lambda-deploy-endpoint-" + current_time

function_name = "sagemaker-lambda-step-endpoint-deploy-" + current_time

# Lambda helper class can be used to create the Lambda function
func = Lambda(
    function_name=function_name,
    execution_role_arn=lambda_role,
    script="code/lambda_helper.py",
    handler="lambda_helper.lambda_handler",
)

output_param_1 = LambdaOutput(output_name="statusCode", output_type=LambdaOutputTypeEnum.String)
output_param_2 = LambdaOutput(output_name="body", output_type=LambdaOutputTypeEnum.String)
output_param_3 = LambdaOutput(output_name="other_key", output_type=LambdaOutputTypeEnum.String)

step_deploy_lambda = LambdaStep(
    name="LambdaStep",
    lambda_func=func,
    inputs={
        "model_name": step_create_model.properties.ModelName,
        "endpoint_config_name": endpoint_config_name,
        "endpoint_name": endpoint_name,
    },
    outputs=[output_param_1, output_param_2, output_param_3],
)
[16]:
# ConditionStep for evaluating model quality and branching execution.
# The `json_path` value is based on the `report_dict` variable in `evaluate.py`

cond_lte = ConditionLessThanOrEqualTo(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="regression_metrics.mse.value",
    ),
    right=6.0,
)

step_cond = ConditionStep(
    name="CheckMSEAbaloneEvaluation",
    conditions=[cond_lte],
    if_steps=[step_create_model, step_deploy_lambda],
    else_steps=[],
)
[17]:
# Use the same pipeline name across executions for cache usage.

pipeline_name = "lambda-step-pipeline" + current_time

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        training_instance_type,
        input_data,
        model_approval_status,
    ],
    steps=[step_process, step_train, step_eval, step_cond],
    sagemaker_session=pipeline_session,
)

Execute the Pipeline

[18]:
import json

definition = json.loads(pipeline.definition())
definition
[18]:
{'Version': '2020-12-01',
 'Metadata': {},
 'Parameters': [{'Name': 'ProcessingInstanceCount',
   'Type': 'Integer',
   'DefaultValue': 1},
  {'Name': 'TrainingInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.m5.xlarge'},
  {'Name': 'InputDataUrl',
   'Type': 'String',
   'DefaultValue': 's3://sagemaker-sample-files/datasets/tabular/uci_abalone/abalone.csv'},
  {'Name': 'ModelApprovalStatus',
   'Type': 'String',
   'DefaultValue': 'PendingManualApproval'}],
 'PipelineExperimentConfig': {'ExperimentName': {'Get': 'Execution.PipelineName'},
  'TrialName': {'Get': 'Execution.PipelineExecutionId'}},
 'Steps': [{'Name': 'PreprocessAbaloneData',
   'Type': 'Processing',
   'Arguments': {'ProcessingResources': {'ClusterConfig': {'InstanceType': 'ml.m5.xlarge',
      'InstanceCount': {'Get': 'Parameters.ProcessingInstanceCount'},
      'VolumeSizeInGB': 30}},
    'AppSpecification': {'ImageUri': '246618743249.dkr.ecr.us-west-2.amazonaws.com/sagemaker-scikit-learn:0.23-1-cpu-py3',
     'ContainerArguments': ['--input-data',
      {'Get': 'Parameters.InputDataUrl'}],
     'ContainerEntrypoint': ['python3',
      '/opt/ml/processing/input/code/preprocess.py']},
    'RoleArn': 'arn:aws:iam::000000000000:role/SageMakerRole',
    'ProcessingInputs': [{'InputName': 'code',
      'AppManaged': False,
      'S3Input': {'S3Uri': 's3://sagemaker-us-west-2-000000000000/lambda-step-example/sklearn-abalone-pre-2022-07-13-15-58-56-005/input/code/preprocess.py',
       'LocalPath': '/opt/ml/processing/input/code',
       'S3DataType': 'S3Prefix',
       'S3InputMode': 'File',
       'S3DataDistributionType': 'FullyReplicated',
       'S3CompressionType': 'None'}}],
    'ProcessingOutputConfig': {'Outputs': [{'OutputName': 'train',
       'AppManaged': False,
       'S3Output': {'S3Uri': 's3://sagemaker-us-west-2-000000000000/lambda-step-example/sklearn-abalone-pre-2022-07-13-15-58-56-005/output/train',
        'LocalPath': '/opt/ml/processing/train',
        'S3UploadMode': 'EndOfJob'}},
      {'OutputName': 'validation',
       'AppManaged': False,
       'S3Output': {'S3Uri': 's3://sagemaker-us-west-2-000000000000/lambda-step-example/sklearn-abalone-pre-2022-07-13-15-58-56-005/output/validation',
        'LocalPath': '/opt/ml/processing/validation',
        'S3UploadMode': 'EndOfJob'}},
      {'OutputName': 'test',
       'AppManaged': False,
       'S3Output': {'S3Uri': 's3://sagemaker-us-west-2-000000000000/lambda-step-example/sklearn-abalone-pre-2022-07-13-15-58-56-005/output/test',
        'LocalPath': '/opt/ml/processing/test',
        'S3UploadMode': 'EndOfJob'}}]}},
   'CacheConfig': {'Enabled': True, 'ExpireAfter': '30d'}},
  {'Name': 'TrainAbaloneModel',
   'Type': 'Training',
   'Arguments': {'AlgorithmSpecification': {'TrainingInputMode': 'File',
     'TrainingImage': '246618743249.dkr.ecr.us-west-2.amazonaws.com/sagemaker-xgboost:1.0-1-cpu-py3'},
    'OutputDataConfig': {'S3OutputPath': 's3://sagemaker-us-west-2-000000000000/lambda-step-example/AbaloneTrain'},
    'StoppingCondition': {'MaxRuntimeInSeconds': 86400},
    'ResourceConfig': {'VolumeSizeInGB': 30,
     'InstanceCount': 1,
     'InstanceType': {'Get': 'Parameters.TrainingInstanceType'}},
    'RoleArn': 'arn:aws:iam::000000000000:role/SageMakerRole',
    'InputDataConfig': [{'DataSource': {'S3DataSource': {'S3DataType': 'S3Prefix',
        'S3Uri': {'Get': "Steps.PreprocessAbaloneData.ProcessingOutputConfig.Outputs['train'].S3Output.S3Uri"},
        'S3DataDistributionType': 'FullyReplicated'}},
      'ContentType': 'text/csv',
      'ChannelName': 'train'},
     {'DataSource': {'S3DataSource': {'S3DataType': 'S3Prefix',
        'S3Uri': {'Get': "Steps.PreprocessAbaloneData.ProcessingOutputConfig.Outputs['validation'].S3Output.S3Uri"},
        'S3DataDistributionType': 'FullyReplicated'}},
      'ContentType': 'text/csv',
      'ChannelName': 'validation'}],
    'HyperParameters': {'objective': 'reg:linear',
     'num_round': '50',
     'max_depth': '5',
     'eta': '0.2',
     'gamma': '4',
     'min_child_weight': '6',
     'subsample': '0.7',
     'silent': '0'},
    'ProfilerRuleConfigurations': [{'RuleConfigurationName': 'ProfilerReport-1657727936',
      'RuleEvaluatorImage': '895741380848.dkr.ecr.us-west-2.amazonaws.com/sagemaker-debugger-rules:latest',
      'RuleParameters': {'rule_to_invoke': 'ProfilerReport'}}],
    'ProfilerConfig': {'S3OutputPath': 's3://sagemaker-us-west-2-000000000000/lambda-step-example/AbaloneTrain'}},
   'CacheConfig': {'Enabled': True, 'ExpireAfter': '30d'}},
  {'Name': 'EvaluateAbaloneModel',
   'Type': 'Processing',
   'Arguments': {'ProcessingResources': {'ClusterConfig': {'InstanceType': 'ml.m5.xlarge',
      'InstanceCount': 1,
      'VolumeSizeInGB': 30}},
    'AppSpecification': {'ImageUri': '246618743249.dkr.ecr.us-west-2.amazonaws.com/sagemaker-xgboost:1.0-1-cpu-py3',
     'ContainerEntrypoint': ['python3',
      '/opt/ml/processing/input/code/evaluate.py']},
    'RoleArn': 'arn:aws:iam::000000000000:role/SageMakerRole',
    'ProcessingInputs': [{'InputName': 'input-1',
      'AppManaged': False,
      'S3Input': {'S3Uri': {'Get': 'Steps.TrainAbaloneModel.ModelArtifacts.S3ModelArtifacts'},
       'LocalPath': '/opt/ml/processing/model',
       'S3DataType': 'S3Prefix',
       'S3InputMode': 'File',
       'S3DataDistributionType': 'FullyReplicated',
       'S3CompressionType': 'None'}},
     {'InputName': 'input-2',
      'AppManaged': False,
      'S3Input': {'S3Uri': {'Get': "Steps.PreprocessAbaloneData.ProcessingOutputConfig.Outputs['test'].S3Output.S3Uri"},
       'LocalPath': '/opt/ml/processing/test',
       'S3DataType': 'S3Prefix',
       'S3InputMode': 'File',
       'S3DataDistributionType': 'FullyReplicated',
       'S3CompressionType': 'None'}},
     {'InputName': 'code',
      'AppManaged': False,
      'S3Input': {'S3Uri': 's3://sagemaker-us-west-2-000000000000/lambda-step-pipeline/lambda-step-exampl-2022-07-13-15-58-56-545/input/code/evaluate.py',
       'LocalPath': '/opt/ml/processing/input/code',
       'S3DataType': 'S3Prefix',
       'S3InputMode': 'File',
       'S3DataDistributionType': 'FullyReplicated',
       'S3CompressionType': 'None'}}],
    'ProcessingOutputConfig': {'Outputs': [{'OutputName': 'evaluation',
       'AppManaged': False,
       'S3Output': {'S3Uri': 's3://sagemaker-us-west-2-000000000000/lambda-step-pipeline/evaluation_report',
        'LocalPath': '/opt/ml/processing/evaluation',
        'S3UploadMode': 'EndOfJob'}}]}},
   'CacheConfig': {'Enabled': True, 'ExpireAfter': '30d'},
   'PropertyFiles': [{'PropertyFileName': 'AbaloneEvaluationReport',
     'OutputName': 'evaluation',
     'FilePath': 'evaluation.json'}]},
  {'Name': 'CheckMSEAbaloneEvaluation',
   'Type': 'Condition',
   'Arguments': {'Conditions': [{'Type': 'LessThanOrEqualTo',
      'LeftValue': {'Std:JsonGet': {'PropertyFile': {'Get': 'Steps.EvaluateAbaloneModel.PropertyFiles.AbaloneEvaluationReport'},
        'Path': 'regression_metrics.mse.value'}},
      'RightValue': 6.0}],
    'IfSteps': [{'Name': 'CreateModel-CreateModel',
      'Type': 'Model',
      'Arguments': {'ExecutionRoleArn': 'arn:aws:iam::000000000000:role/SageMakerRole',
       'PrimaryContainer': {'Image': '246618743249.dkr.ecr.us-west-2.amazonaws.com/sagemaker-xgboost:1.0-1-cpu-py3',
        'Environment': {},
        'ModelDataUrl': {'Get': 'Steps.TrainAbaloneModel.ModelArtifacts.S3ModelArtifacts'}}}},
     {'Name': 'LambdaStep',
      'Type': 'Lambda',
      'Arguments': {'model_name': {'Get': 'Steps.CreateModel-CreateModel.ModelName'},
       'endpoint_config_name': 'demo-lambda-deploy-endpoint-config-07-13-15-58-57',
       'endpoint_name': 'demo-lambda-deploy-endpoint-07-13-15-58-57'},
      'FunctionArn': 'arn:aws:lambda:us-west-2:000000000000:function:sagemaker-lambda-step-endpoint-deploy-07-13-15-58-57',
      'OutputParameters': [{'OutputName': 'statusCode',
        'OutputType': 'String'},
       {'OutputName': 'body', 'OutputType': 'String'},
       {'OutputName': 'other_key', 'OutputType': 'String'}]}],
    'ElseSteps': []}}]}
[19]:
pipeline.upsert(role_arn=role)
[19]:
{'PipelineArn': 'arn:aws:sagemaker:us-west-2:000000000000:pipeline/lambda-step-pipeline07-13-15-58-57',
 'ResponseMetadata': {'RequestId': 'b8946d56-ca49-4275-add7-d48f765b4931',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'b8946d56-ca49-4275-add7-d48f765b4931',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '102',
   'date': 'Wed, 13 Jul 2022 15:58:59 GMT'},
  'RetryAttempts': 0}}
[20]:
execution = pipeline.start()
[21]:
execution.wait()
[22]:
# Create a SageMaker client
sm_client = sagemaker.Session().sagemaker_client

# Wait for the endpoint to be in service
waiter = sm_client.get_waiter("endpoint_in_service")
waiter.wait(EndpointName=endpoint_name)

Clean up resources

Running the following cell will delete the following resources created in this notebook - * SageMaker Model * SageMaker Endpoint Configuration * SageMaker Endpoint * SageMaker Pipeline * Lambda Function

[23]:
# Get the model name from the EndpointCofig. The CreateModelStep properties are not available
# outside the Pipeline execution context so `step_create_model.properties.ModelName`
# cannot be used while deleting the model.

model_name = sm_client.describe_endpoint_config(EndpointConfigName=endpoint_config_name)[
    "ProductionVariants"
][0]["ModelName"]

# Delete the Model
sm_client.delete_model(ModelName=model_name)

# Delete the EndpointConfig
sm_client.delete_endpoint_config(EndpointConfigName=endpoint_config_name)

# Delete the Endpoint
sm_client.delete_endpoint(EndpointName=endpoint_name)

# Delete the Lambda function
func.delete()

# Delete the Pipeline
sm_client.delete_pipeline(PipelineName=pipeline_name)
[23]:
{'PipelineArn': 'arn:aws:sagemaker:us-west-2:000000000000:pipeline/lambda-step-pipeline07-13-15-58-57',
 'ResponseMetadata': {'RequestId': 'f71a3ce5-bac1-4250-9129-4161507f62d4',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'f71a3ce5-bac1-4250-9129-4161507f62d4',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '102',
   'date': 'Wed, 13 Jul 2022 16:16:08 GMT'},
  'RetryAttempts': 0}}