Develop, Train, Optimize and Deploy Scikit-Learn Random Forest

In this notebook we show how to use Amazon SageMaker to develop, train, tune and deploy a Scikit-Learn based ML model (Random Forest). More info on Scikit-Learn can be found here https://scikit-learn.org/stable/index.html. We use the California Housing dataset, present in Scikit-Learn: https://scikit-learn.org/stable/modules/generated/sklearn.datasets.fetch_california_housing.html. The California Housing dataset was originally published in:

Pace, R. Kelley, and Ronald Barry. “Sparse spatial autoregressions.” Statistics & Probability Letters 33.3 (1997): 291-297.

This sample is provided for demonstration purposes, make sure to conduct appropriate testing if derivating this code for your own use-cases!

[ ]:
import datetime
import time
import tarfile

import boto3
import pandas as pd
import numpy as np
from sagemaker import get_execution_role
import sagemaker
from sklearn.model_selection import train_test_split
from sklearn.datasets import fetch_california_housing


sm_boto3 = boto3.client("sagemaker")

sess = sagemaker.Session()

region = sess.boto_session.region_name

bucket = sess.default_bucket()  # this could also be a hard-coded bucket name

print("Using bucket " + bucket)

Prepare data

We load a dataset from sklearn, split it and send it to S3

[ ]:
# we use the California housing dataset
data = fetch_california_housing()
[ ]:
X_train, X_test, y_train, y_test = train_test_split(
    data.data, data.target, test_size=0.25, random_state=42
)

trainX = pd.DataFrame(X_train, columns=data.feature_names)
trainX["target"] = y_train

testX = pd.DataFrame(X_test, columns=data.feature_names)
testX["target"] = y_test
[ ]:
trainX.head()
[ ]:
trainX.to_csv("california_housing_train.csv")
testX.to_csv("california_housing_test.csv")
[ ]:
# send data to S3. SageMaker will take training data from s3
trainpath = sess.upload_data(
    path="california_housing_train.csv", bucket=bucket, key_prefix="sagemaker/sklearncontainer"
)

testpath = sess.upload_data(
    path="california_housing_test.csv", bucket=bucket, key_prefix="sagemaker/sklearncontainer"
)

Writing a Script Mode script

The below script contains both training and inference functionality and can run both in SageMaker Training hardware or locally (desktop, SageMaker notebook, on prem, etc). Detailed guidance here https://sagemaker.readthedocs.io/en/stable/using_sklearn.html#preparing-the-scikit-learn-training-script

[ ]:
%%writefile script.py

import argparse
import joblib
import os

import numpy as np
import pandas as pd
from sklearn.ensemble import RandomForestRegressor


# inference functions ---------------
def model_fn(model_dir):
    clf = joblib.load(os.path.join(model_dir, "model.joblib"))
    return clf


if __name__ == "__main__":

    print("extracting arguments")
    parser = argparse.ArgumentParser()

    # hyperparameters sent by the client are passed as command-line arguments to the script.
    # to simplify the demo we don't use all sklearn RandomForest hyperparameters
    parser.add_argument("--n-estimators", type=int, default=10)
    parser.add_argument("--min-samples-leaf", type=int, default=3)

    # Data, model, and output directories
    parser.add_argument("--model-dir", type=str, default=os.environ.get("SM_MODEL_DIR"))
    parser.add_argument("--train", type=str, default=os.environ.get("SM_CHANNEL_TRAIN"))
    parser.add_argument("--test", type=str, default=os.environ.get("SM_CHANNEL_TEST"))
    parser.add_argument("--train-file", type=str, default="california_housing_train.csv")
    parser.add_argument("--test-file", type=str, default="california_housing_test.csv")
    parser.add_argument(
        "--features", type=str
    )  # in this script we ask user to explicitly name features
    parser.add_argument(
        "--target", type=str
    )  # in this script we ask user to explicitly name the target

    args, _ = parser.parse_known_args()

    print("reading data")
    train_df = pd.read_csv(os.path.join(args.train, args.train_file))
    test_df = pd.read_csv(os.path.join(args.test, args.test_file))

    print("building training and testing datasets")
    X_train = train_df[args.features.split()]
    X_test = test_df[args.features.split()]
    y_train = train_df[args.target]
    y_test = test_df[args.target]

    # train
    print("training model")
    model = RandomForestRegressor(
        n_estimators=args.n_estimators, min_samples_leaf=args.min_samples_leaf, n_jobs=-1
    )

    model.fit(X_train, y_train)

    # print abs error
    print("validating model")
    abs_err = np.abs(model.predict(X_test) - y_test)

    # print couple perf metrics
    for q in [10, 50, 90]:
        print("AE-at-" + str(q) + "th-percentile: " + str(np.percentile(a=abs_err, q=q)))

    # persist model
    path = os.path.join(args.model_dir, "model.joblib")
    joblib.dump(model, path)
    print("model persisted at " + path)
    print(args.min_samples_leaf)

Local training

Script arguments allows us to remove from the script any SageMaker-specific configuration, and run locally

[ ]:
! python script.py --n-estimators 100 \
                   --min-samples-leaf 2 \
                   --model-dir ./ \
                   --train ./ \
                   --test ./ \
                   --features 'MedInc HouseAge AveRooms AveBedrms Population AveOccup Latitude Longitude' \
                   --target target

SageMaker Training

Launching a training job with the Python SDK

[ ]:
# We use the Estimator from the SageMaker Python SDK
from sagemaker.sklearn.estimator import SKLearn

FRAMEWORK_VERSION = "0.23-1"

sklearn_estimator = SKLearn(
    entry_point="script.py",
    role=get_execution_role(),
    instance_count=1,
    instance_type="ml.c5.xlarge",
    framework_version=FRAMEWORK_VERSION,
    base_job_name="rf-scikit",
    metric_definitions=[{"Name": "median-AE", "Regex": "AE-at-50th-percentile: ([0-9.]+).*$"}],
    hyperparameters={
        "n-estimators": 100,
        "min-samples-leaf": 3,
        "features": "MedInc HouseAge AveRooms AveBedrms Population AveOccup Latitude Longitude",
        "target": "target",
    },
)
[ ]:
# launch training job, with asynchronous call
sklearn_estimator.fit({"train": trainpath, "test": testpath}, wait=False)

Alternative: launching a training with boto3

boto3 is more verbose yet gives more visibility in the low-level details of Amazon SageMaker

[ ]:
# first compress the code and send to S3

source = "source.tar.gz"
project = "scikitlearn-train-from-boto3"

tar = tarfile.open(source, "w:gz")
tar.add("script.py")
tar.close()

s3 = boto3.client("s3")
s3.upload_file(source, bucket, project + "/" + source)

When using boto3 to launch a training job we must explicitly point to a docker image.

[ ]:
from sagemaker import image_uris


training_image = image_uris.retrieve(
    framework="sklearn",
    region=region,
    version=FRAMEWORK_VERSION,
    py_version="py3",
    instance_type="ml.c5.xlarge",
)
print(training_image)
[ ]:
# launch training job

response = sm_boto3.create_training_job(
    TrainingJobName="sklearn-boto3-" + datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S"),
    HyperParameters={
        "n_estimators": "300",
        "min_samples_leaf": "3",
        "sagemaker_program": "script.py",
        "features": "MedInc HouseAge AveRooms AveBedrms Population AveOccup Latitude Longitude",
        "target": "target",
        "sagemaker_submit_directory": "s3://" + bucket + "/" + project + "/" + source,
    },
    AlgorithmSpecification={
        "TrainingImage": training_image,
        "TrainingInputMode": "File",
        "MetricDefinitions": [
            {"Name": "median-AE", "Regex": "AE-at-50th-percentile: ([0-9.]+).*$"},
        ],
    },
    RoleArn=get_execution_role(),
    InputDataConfig=[
        {
            "ChannelName": "train",
            "DataSource": {
                "S3DataSource": {
                    "S3DataType": "S3Prefix",
                    "S3Uri": trainpath,
                    "S3DataDistributionType": "FullyReplicated",
                }
            },
        },
        {
            "ChannelName": "test",
            "DataSource": {
                "S3DataSource": {
                    "S3DataType": "S3Prefix",
                    "S3Uri": testpath,
                    "S3DataDistributionType": "FullyReplicated",
                }
            },
        },
    ],
    OutputDataConfig={"S3OutputPath": "s3://" + bucket + "/sagemaker-sklearn-artifact/"},
    ResourceConfig={"InstanceType": "ml.c5.xlarge", "InstanceCount": 1, "VolumeSizeInGB": 10},
    StoppingCondition={"MaxRuntimeInSeconds": 86400},
    EnableNetworkIsolation=False,
)

print(response)

Launching a tuning job with the Python SDK

[ ]:
# we use the Hyperparameter Tuner
from sagemaker.tuner import IntegerParameter

# Define exploration boundaries
hyperparameter_ranges = {
    "n-estimators": IntegerParameter(20, 100),
    "min-samples-leaf": IntegerParameter(2, 6),
}

# create Optimizer
Optimizer = sagemaker.tuner.HyperparameterTuner(
    estimator=sklearn_estimator,
    hyperparameter_ranges=hyperparameter_ranges,
    base_tuning_job_name="RF-tuner",
    objective_type="Minimize",
    objective_metric_name="median-AE",
    metric_definitions=[
        {"Name": "median-AE", "Regex": "AE-at-50th-percentile: ([0-9.]+).*$"}
    ],  # extract tracked metric from logs with regexp
    max_jobs=10,
    max_parallel_jobs=2,
)
[ ]:
Optimizer.fit({"train": trainpath, "test": testpath})
[ ]:
# get tuner results in a df
results = Optimizer.analytics().dataframe()
while results.empty:
    time.sleep(1)
    results = Optimizer.analytics().dataframe()
results.head()

Deploy to a real-time endpoint

Deploy with Python SDK

An Estimator could be deployed directly after training, with an Estimator.deploy() but here we showcase the more extensive process of creating a model from s3 artifacts, that could be used to deploy a model that was trained in a different session or even out of SageMaker.

[ ]:
sklearn_estimator.latest_training_job.wait(logs="None")
artifact = sm_boto3.describe_training_job(
    TrainingJobName=sklearn_estimator.latest_training_job.name
)["ModelArtifacts"]["S3ModelArtifacts"]

print("Model artifact persisted at " + artifact)
[ ]:
from sagemaker.sklearn.model import SKLearnModel

model = SKLearnModel(
    model_data=artifact,
    role=get_execution_role(),
    entry_point="script.py",
    framework_version=FRAMEWORK_VERSION,
)
[ ]:
predictor = model.deploy(instance_type="ml.c5.large", initial_instance_count=1)

Invoke with the Python SDK

[ ]:
# the SKLearnPredictor does the serialization from pandas for us
print(predictor.predict(testX[data.feature_names]))

Alternative: invoke with boto3

[ ]:
runtime = boto3.client("sagemaker-runtime")

Option 1: csv serialization

[ ]:
# csv serialization
response = runtime.invoke_endpoint(
    EndpointName=predictor.endpoint,
    Body=testX[data.feature_names].to_csv(header=False, index=False).encode("utf-8"),
    ContentType="text/csv",
)

print(response["Body"].read())

Option 2: npy serialization

[ ]:
# npy serialization
from io import BytesIO


# Serialise numpy ndarray as bytes
buffer = BytesIO()
# Assuming testX is a data frame
np.save(buffer, testX[data.feature_names].values)

response = runtime.invoke_endpoint(
    EndpointName=predictor.endpoint, Body=buffer.getvalue(), ContentType="application/x-npy"
)

print(response["Body"].read())

Don’t forget to delete the endpoint !

[ ]:
sm_boto3.delete_endpoint(EndpointName=predictor.endpoint)