Creating an Amazon Forecast Predictor with SageMaker Pipelines

This example notebook showcases how you can create a dataset, dataset group and predictor with Amazon Forecast and SageMaker Pipelines. This demo is designed to run on SageMaker Notebook Instances. As of February 2022, this code will not properly execute in SageMaker Studio, due to a docker limitation on SageMaker Studio Notebooks.

Integrating SageMaker Pipelines with Amazon Forecast is useful for the following three reasons: 1. Iteratively improve your model by tracking the performance of each execution using SageMaker Experiments. 2. Reproducibility of Forecast experiments. 3. Decouple different processes in your Amazon Forecast machine learning project and visualize these in a Directed Acyclic Graph using SageMaker Pipelines.

This notebook can be used as a template to start training your own Forecast predictors with SageMaker Pipelines. Before you start, make sure that your SageMaker Execution Role has the following policies:

  • AmazonForecastFullAccess

  • AmazonSageMakerFullAccess

Your SageMaker Execution Role should have access to S3 already. If not you can add an S3 policy. You will also need to the inline policy described below:

[ ]:
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": [
                "iam:GetRole",
                "s3:*",
                "iam:CreateRole",
                "iam:AttachRolePolicy",
                "forecast:*",
            ],
            "Resource": "*",
        }
    ],
}

Finally, you will need the following trust policies.

[ ]:
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "Service": ["s3.amazonaws.com", "forecast.amazonaws.com", "sagemaker.amazonaws.com"]
            },
            "Action": "sts:AssumeRole",
        }
    ],
}

Prerequisites

First, we are going to import the SageMaker SDK and set some default variables such as the role for permissioned execution and the default_bucket to store model artifacts.

Then, we have to update the base Scikit-learn SageMaker image to update boto3 and botocore. As of February 2022, the Scikit-learn image has an older version of botocore (1.19.4) which does not yet contain code for API calls you need to make to Amazon Forecast. The script below creates an ECR repository with the given repo_name within your AWS account in the region you are running this notebook from. It then pulls as base image the Prebuilt Amazon SageMaker Docker Image for Scikit-learn. This notebook automatically selects the correct image_acc_id for the region you’re in using the region_to_account_id dictionary, according to https://docs.aws.amazon.com/sagemaker/latest/dg/pre-built-docker-containers-scikit-learn-spark.html.

[ ]:
! pip install sagemaker==2.93.0
[ ]:
import json
import tarfile
import time

import boto3
import botocore
import sagemaker
import os
import zipfile
import pandas as pd
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.sklearn.estimator import SKLearn
from sagemaker.sklearn.processing import ScriptProcessor
from sagemaker.workflow.execution_variables import ExecutionVariables
from sagemaker.workflow.functions import Join
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
)
from sagemaker.workflow.pipeline_experiment_config import PipelineExperimentConfig
from sagemaker.workflow.steps import ProcessingStep, TrainingStep

region = boto3.Session().region_name
sagemaker_session = sagemaker.session.Session()
role_arn = sagemaker.get_execution_role()
image_uri = sagemaker.image_uris.retrieve(
    framework="sklearn", region=region, version="1.0-1", image_scope="training"
)
image_acc_id = image_uri.split(".")[0]
default_bucket = sagemaker_session.default_bucket()
default_bucket
[ ]:
%%sh -s "$image_acc_id" "$image_uri"

# The name of our algorithm
repo_name=sagemaker-sklearn-botocore-updated

account=$(aws sts get-caller-identity --query Account --output text)

# Get the region defined in the current configuration (default to us-west-2 if none defined)
region=$(aws configure get region)

# Write the Dockerfile
mkdir docker
cd docker

printf "FROM $2 \n
RUN python3 -m pip install --upgrade pip \n
RUN python3 -m pip install boto3==1.20.25 \n
RUN python3 -m pip install botocore==1.23.25 " > Dockerfile


fullname="${account}.dkr.ecr.${region}.amazonaws.com/${repo_name}:latest"
aws_base_image_acc="$1.dkr.ecr.${region}.amazonaws.com"

# If the repository doesn't exist in ECR, create it.
aws ecr describe-repositories --repository-names "${repo_name}" > /dev/null 2>&1

if [ $? -ne 0 ]
then
    aws ecr create-repository --repository-name "${repo_name}" > /dev/null
fi

# Get the login command from ECR and execute it directly
aws ecr get-login-password --region ${region}|docker login --username AWS --password-stdin ${aws_base_image_acc}

# Build the docker image locally with the image name and then push it to ECR
# with the full name.
docker build -t ${repo_name} .
docker tag ${repo_name} ${fullname}

aws ecr get-login-password --region ${region}|docker login --username AWS --password-stdin "${account}".dkr.ecr."${region}".amazonaws.com
docker push ${fullname}

# Clean up unencrypted credentials and Dockerfile
cd ..
rm -rf docker
> /home/ec2-user/.docker/config.json

Dataset

Let’s inspect the train dataset we will be using in this example.

[ ]:
DATA_HOST = "sagemaker-sample-files"
DATA_PATH = "datasets/timeseries/uci_electricity/"
ARCHIVE_NAME = "LD2011_2014.txt.zip"
FILE_NAME = ARCHIVE_NAME[:-4]

s3_client = boto3.client("s3")

if not os.path.isfile(FILE_NAME):
    print("downloading dataset (258MB), can take a few minutes depending on your connection")
    s3_client.download_file(DATA_HOST, DATA_PATH + ARCHIVE_NAME, ARCHIVE_NAME)

    print("\nextracting data archive")
    zip_ref = zipfile.ZipFile(ARCHIVE_NAME, "r")
    zip_ref.extractall("./")
    zip_ref.close()
else:
    print("File found skipping download")
[ ]:
df = pd.read_csv(FILE_NAME, sep=";", index_col=0, parse_dates=True, decimal=",")

# Take only one target time series
df = df[["MT_001"]]
print(df.index.min())
print(df.index.max())
df.head()

df.to_csv("train.csv")

The dataset happens to span January 01, 2011, to January 01, 2015. We are only going to use about two and a half week’s of hourly data to train Amazon Forecast. We will copy the dataset from this local directory to s3 so that SageMaker can access it.

[ ]:
!aws s3 cp ./train.csv s3://$default_bucket/forecast_pipeline_example/

Next, we define parameters that can be set for the execution of the pipeline. They serve as variables. We define the following:

  • ProcessingInstanceCount: The number of processing instances to use for the execution of the pipeline

  • ProcessingInstanceType: The type of processing instances to use for the execution of the pipeline

  • TrainingInstanceCount: The number of training instances to use for the execution of the pipeline

  • TrainingInstanceType: The type of training instances to use for the execution of the pipeline

  • TrainData: Location of the training data in S3

  • ModelOutput: Location of the target S3 path for the Amazon Forecast model artifact

Amazon Forecast creates its own validation set when training, so there is no need to provide one.

We also define some important parameters to choose, train and evaluate the model - ForecastHorizon: The Forecast Horizon (Prediction length) - ForecastAlgorithm: What algorithm to use from Amazon Forecast (ex DeepArPlus, CNNQR, …) - EvaluationMetric: The evaluation metric used to select (keep) the model - MaxScore: The evaluation metric’s threshold to select (keep) the model

[ ]:
processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1)
processing_instance_type = ParameterString(
    name="ProcessingInstanceType", default_value="ml.m5.large"
)
training_instance_type = ParameterString(name="TrainingInstanceType", default_value="ml.m5.large")

input_train = ParameterString(
    name="TrainData",
    default_value=f"s3://{default_bucket}/forecast_pipeline_example/train.csv",
)
model_output = ParameterString(name="ModelOutput", default_value=f"s3://{default_bucket}/model")

# Model parameters
forecast_horizon = ParameterString(name="ForecastHorizon", default_value="24")
forecast_algorithm = ParameterString(name="ForecastAlgorithm", default_value="NPTS")
maximum_score = ParameterString(name="MaxScore", default_value="0.4")
metric = ParameterString(name="EvaluationMetric", default_value="WAPE")

We use an updated SKLearnProcessor to run Python scripts to build a dataset group and train an Amazon Forecast predictor using boto3. In the next chunk, we instantiate an instance of ScriptProcessor, which is essentially an SKLearnProcessor with updated boto3 and botocore (as built above) that we use in the next steps.

[ ]:
account_id = role_arn.split(":")[4]
ecr_repository_name = "sagemaker-sklearn-botocore-updated"
tag = "latest"
container_image_uri = "{0}.dkr.ecr.{1}.amazonaws.com/{2}:{3}".format(
    account_id, region, ecr_repository_name, tag
)
[ ]:
sklearn_processor = ScriptProcessor(
    image_uri=container_image_uri,
    command=["python3"],
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    base_job_name="forecast-process",
    sagemaker_session=sagemaker_session,
    role=role_arn,
)

First we preprocess the data using an Amazon SageMaker ProcessingStep that provides a containerized execution environment to run the preprocess.py script.

[ ]:
preprocess = ProcessingStep(
    name="ForecastPreProcess",
    processor=sklearn_processor,
    inputs=[
        ProcessingInput(source=input_train, destination="/opt/ml/processing/input_train"),
    ],
    outputs=[
        ProcessingOutput(output_name="target", source="/opt/ml/processing/target"),
        ProcessingOutput(output_name="related", source="/opt/ml/processing/related"),
    ],
    job_arguments=["--forecast_horizon", forecast_horizon],
    code="preprocess.py",
)

The next step is to train and evaluate the forecasting model calling Amazon Forecast using boto3. We instantiate an instance of SKLearn estimator that we use in the next TrainingStep to run the script train.py. Amazon Forecast automatically evaluates the performance on an evaluation set. We will use that score as a condition for deploying the model. The algorithm training is managed by Amazon Forecast. We use a TrainingStep instead of a ProcessingStep to log the metrics with SageMaker Experiments.

[ ]:
# Define the hyperparmeters and the Regex Syntax associated to the metrics
hyperparameters = {
    "forecast_horizon": forecast_horizon,
    "forecast_algorithm": forecast_algorithm,
    "dataset_frequency": "H",
    "timestamp_format": "yyyy-MM-dd hh:mm:ss",
    "number_of_backtest_windows": "1",
    "s3_directory_target": preprocess.properties.ProcessingOutputConfig.Outputs[
        "target"
    ].S3Output.S3Uri,
    "s3_directory_related": preprocess.properties.ProcessingOutputConfig.Outputs[
        "related"
    ].S3Output.S3Uri,
    "role_arn": role_arn,
    "region": region,
}
metric_definitions = [
    {"Name": "WAPE", "Regex": "WAPE=(.*?);"},
    {"Name": "RMSE", "Regex": "RMSE=(.*?);"},
    {"Name": "MASE", "Regex": "MASE=(.*?);"},
    {"Name": "MAPE", "Regex": "MAPE=(.*?);"},
]
[ ]:
forecast_model = SKLearn(
    entry_point="train.py",
    role=role_arn,
    image_uri=container_image_uri,
    instance_type=training_instance_type,
    sagemaker_session=sagemaker_session,
    base_job_name="forecast-train",
    hyperparameters=hyperparameters,
    enable_sagemaker_metrics=True,
    metric_definitions=metric_definitions,
)
[ ]:
forecast_train_and_eval = TrainingStep(name="ForecastTrainAndEvaluate", estimator=forecast_model)

The third step is an Amazon SageMaker ProcessingStep that deletes or keeps the Amazon Forecast model running using the script conditional_delete.py. If the error reported after training is higher than a threshold you specify for the metric you specify, this step deletes all the resources created by Amazon Forecast that are related to the pipeline’s execution.

[ ]:
postprocess = ProcessingStep(
    name="ForecastCondtionalDelete",
    processor=sklearn_processor,
    inputs=[
        ProcessingInput(
            source=forecast_train_and_eval.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model",
        ),
    ],
    job_arguments=[
        "--metric",
        metric,
        "--maximum-score",
        maximum_score,
        "--region",
        region,
    ],
    code="conditional_delete.py",
)

Finally, we combine all the steps and define our pipeline.

[ ]:
from sagemaker.workflow.pipeline import Pipeline

pipeline_name = "ForecastPipeline"
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_type,
        processing_instance_count,
        training_instance_type,
        input_train,
        forecast_horizon,
        forecast_algorithm,
        model_output,
        metric,
        maximum_score,
    ],
    steps=[preprocess, forecast_train_and_eval, postprocess],
    pipeline_experiment_config=PipelineExperimentConfig(
        ExecutionVariables.PIPELINE_NAME,
        Join(on="-", values=["ForecastTrial", ExecutionVariables.PIPELINE_EXECUTION_ID]),
    ),
)

Once the pipeline is successfully defined, we can start the execution.

[ ]:
pipeline.upsert(role_arn=role_arn)
[ ]:
execution = pipeline.start()
[ ]:
execution.wait(delay=300, max_attempts=25)
[ ]:
execution.list_steps()

Experiments Tracking

Each pipeline execution is tracked by default when using SageMaker Pipelines. To find the experiment tracking in SageMaker Studio, you should open SageMaker Resources and select Experiments and Trials. The experiments and trials are organized as follows:

  • The Pipeline ForecastPipeline is associated with an Experiment.

    • Each execution of Pipeline ForecastPipeline is associated with a trial.

      • Each step within the execution is associated with a trial component within trial.

To find the Trial Components and Trial name generated by a pipeline execution in ForecastPipeline, you:

  1. Open SageMaker Resources and select Experiments and Trials.

  2. Right-Click on your Pipeline’s name (ForecastPipeline) and select Open in trial component list

  3. You can now filter the trial components and customize the table view as presented in View and Compare Amazon SageMaker Experiments, Trials, and Trial Components.

The AWS documentation for Experiments Tracking can be found under Manage Machine Learning with Amazon SageMaker Experiments.

Conclusion

In this notebook we have seen how to create a SageMaker Pipeline to train an Amazon Forecast predictor on your own dataset with a target and related time series.

Clean up

Feel free to clean up all related resources (the pipeline, s3 object (train.csv), all Forecast related resources) that could potentially incur costs

[ ]:
def wait_till_delete(callback, check_time=5, timeout=100):
    elapsed_time = 0
    while timeout is None or elapsed_time < timeout:
        try:
            out = callback()
        except botocore.exceptions.ClientError as e:
            # When given the resource not found exception, deletion has occured
            if e.response["Error"]["Code"] == "ResourceNotFoundException":
                print("Successful delete")
                return
            else:
                raise
        time.sleep(check_time)  # units of seconds
        elapsed_time += check_time

    raise TimeoutError("Forecast resource deletion timed-out.")
[ ]:
session = boto3.Session(region_name=region)
sagemaker = session.client(service_name="sagemaker")

steps = execution.list_steps()
training_job_name = steps[1]["Metadata"]["TrainingJob"]["Arn"].split("/")[1]
response = sagemaker.describe_training_job(TrainingJobName=training_job_name)
s3_artifacts = response["ModelArtifacts"]["S3ModelArtifacts"]
[ ]:
s3_artifacts
[ ]:
! aws s3 cp $s3_artifacts .
[ ]:
forecast = session.client(service_name="forecast")

session.resource("s3").Bucket(default_bucket).Object("train.csv").delete()

with tarfile.open("model.tar.gz") as tar:
    tar.extractall(path=".")

with open("model_parameters.json", "r") as f:
    model_params = json.load(f)
[ ]:
wait_till_delete(
    lambda: forecast.delete_predictor(PredictorArn=model_params["forecast_arn_predictor"])
)
[ ]:
for arn in ["target_import_job_arn", "related_import_job_arn"]:
    wait_till_delete(
        lambda: forecast.delete_dataset_import_job(DatasetImportJobArn=model_params[arn])
    )
[ ]:
for arn in ["target_dataset_arn", "related_dataset_arn"]:
    wait_till_delete(lambda: forecast.delete_dataset(DatasetArn=model_params[arn]))
[ ]:
wait_till_delete(
    lambda: forecast.delete_dataset_group(DatasetGroupArn=model_params["dataset_group_arn"])
)
[ ]:
pipeline.delete()