Cart-pole Balancing Model with Amazon SageMaker and Ray


Introduction

In this notebook we’ll start from the cart-pole balancing problem, where a pole is attached by an un-actuated joint to a cart, moving along a frictionless track. Instead of applying control theory to solve the problem, this example shows how to solve the problem with reinforcement learning on Amazon SageMaker and Ray RLlib. You can choose either TensorFlow or PyTorch as your underlying DL framework.

(For a similar example using Coach library, see this link. Another Cart-pole example using Coach library and offline data can be found here.)

  1. Objective: Prevent the pole from falling over

  2. Environment: The environment used in this exmaple is part of OpenAI Gym, corresponding to the version of the cart-pole problem described by Barto, Sutton, and Anderson [1]

  3. State: Cart position, cart velocity, pole angle, pole velocity at tip

  4. Action: Push cart to the left, push cart to the right

  5. Reward: Reward is 1 for every step taken, including the termination step

References

  1. AG Barto, RS Sutton and CW Anderson, “Neuronlike Adaptive Elements That Can Solve Difficult Learning Control Problem”, IEEE Transactions on Systems, Man, and Cybernetics, 1983.

Pre-requisites

Imports

To get started, we’ll import the Python libraries as needed, set up the environment with a few prerequisites for permissions and configurations.

[ ]:
import sagemaker
import boto3
import sys
import os
import glob
import re
import subprocess
import numpy as np
from IPython.display import HTML
import time
from time import gmtime, strftime

sys.path.append("common")
from misc import get_execution_role, wait_for_s3_object
from docker_utils import build_and_push_docker_image
from sagemaker.rl import RLEstimator, RLToolkit, RLFramework

Setup S3 bucket

Set up the linkage and authentication to the S3 bucket that you want to use for checkpoint and the metadata.

[ ]:
sage_session = sagemaker.session.Session()
s3_bucket = sage_session.default_bucket()
s3_output_path = "s3://{}/".format(s3_bucket)
print("S3 bucket path: {}".format(s3_output_path))

Define Variables

We define variables such as the job prefix and frameworks for the training jobs, to fetch the latest docker container to train your RL agent. You can also provide an image path for a custom container (only when this is BYOC).

Set framework to 'tf' or 'torch' for TensorFlow or PyTorch, respectively.

[ ]:
# create a descriptive job name
job_name_prefix = "rl-cartpole-ray"

framework = "tf"

Configure where training happens

You can train your RL training jobs using the SageMaker notebook instance or local notebook instance. In both of these scenarios, you can run the following in either local or SageMaker modes. The local mode uses the SageMaker Python SDK to run your code in a local container before deploying to SageMaker. This can speed up iterative testing and debugging while using the same familiar Python SDK interface. You just need to set local_mode = True.

[ ]:
# run in local_mode on this machine, or as a SageMaker TrainingJob?
local_mode = False

if local_mode:
    instance_type = "local"
else:
    # If on SageMaker, pick the instance type.
    instance_type = "ml.c5.2xlarge"
[ ]:
train_instance_count = 1

Create an IAM role

Either get the execution role when running from a SageMaker notebook instance role = sagemaker.get_execution_role() or, when running from local notebook instance, use utils method role = get_execution_role() to create an execution role.

[ ]:
try:
    role = sagemaker.get_execution_role()
except:
    role = get_execution_role()

print("Using IAM role arn: {}".format(role))

Install docker for local mode

In order to work in local mode, you need to have docker installed. When running from you local machine, please make sure that you have docker and docker-compose (for local CPU machines) and nvidia-docker (for local GPU machines) installed. Alternatively, when running from a SageMaker notebook instance, you can simply run the following script to install dependenceis.

Note, you can only run a single local notebook at one time.

[ ]:
# only run from SageMaker notebook instance
if local_mode:
    !/bin/bash ./common/setup.sh

Write the Training Code

The training code is written in the file “train-rl-cartpole-ray.py” which is uploaded in the /src directory. First import the environment files and the preset files, and then define the main() function.

Note: If PyTorch is used, plese update the above training code and set use_pytorch to True in the config.

[ ]:
!pygmentize src/train-rl-cartpole-ray.py

Train the RL model using the Python SDK Script mode

If you are using local mode, the training will run on the notebook instance. When using SageMaker for training, you can select a GPU or CPU instance. The RLEstimator is used for training RL jobs.

  1. Specify the source directory where the environment, presets and training code is uploaded.

  2. Specify the entry point as the training code

  3. Specify training image using toolkit_version or provide custom image path using image_uri.

  4. Define the training parameters such as the instance count, job name, S3 path for output and job name.

  5. Define the metrics definitions that you are interested in capturing in your logs. These can also be visualized in CloudWatch and SageMaker Notebooks.

[ ]:
%%time

metric_definitions = RLEstimator.default_metric_definitions(RLToolkit.RAY)

estimator = RLEstimator(
    entry_point="train-rl-cartpole-ray.py",
    source_dir="src",
    dependencies=["common/sagemaker_rl"],
    toolkit=RLToolkit.RAY,
    framework=RLFramework.TENSORFLOW,
    toolkit_version="1.6.0",
    role=role,
    debugger_hook_config=False,
    instance_type=instance_type,
    instance_count=train_instance_count,
    output_path=s3_output_path,
    base_job_name=job_name_prefix,
    metric_definitions=metric_definitions,
    hyperparameters={
        # Attention scientists!  You can override any Ray algorithm parameter here:
        # "rl.training.config.horizon": 5000,
        # "rl.training.config.num_sgd_iter": 10,
    },
)

estimator.fit(wait=True)
job_name = estimator.latest_training_job.job_name
print("Training job: %s" % job_name)

Plot metrics for training job

We can see the reward metric of the training as it’s running, using algorithm metrics that are recorded in CloudWatch metrics. We can plot this to see the performance of the model over time.

[ ]:
%matplotlib inline
from sagemaker.analytics import TrainingJobAnalytics

if not local_mode:
    df = TrainingJobAnalytics(job_name, ["episode_reward_mean"]).dataframe()
    num_metrics = len(df)
    if num_metrics == 0:
        print("No algorithm metrics found in CloudWatch")
    else:
        plt = df.plot(x="timestamp", y="value", figsize=(12, 5), legend=True, style="b-")
        plt.set_ylabel("Mean reward per episode")
        plt.set_xlabel("Training time (s)")
else:
    print("Can't plot metrics in local mode.")

Evaluation of RL models

We use the last checkpointed model to run evaluation for the RL Agent.

Load checkpointed model

Checkpointed data from the previously trained models will be passed on for evaluation / inference in the checkpoint channel. In local mode, we can simply use the local directory, whereas in the SageMaker mode, it needs to be moved to S3 first.

[ ]:
if local_mode:
    model_tar_key = "{}/model.tar.gz".format(job_name)
else:
    model_tar_key = "{}/output/model.tar.gz".format(job_name)

tmp_dir = "/tmp/{}".format(job_name)
os.system("mkdir {}".format(tmp_dir))
print("Create local folder {}".format(tmp_dir))
local_checkpoint_dir = "{}/model".format(tmp_dir)

wait_for_s3_object(s3_bucket, model_tar_key, tmp_dir, training_job_name=job_name)

if not os.path.isfile("{}/model.tar.gz".format(tmp_dir)):
    raise FileNotFoundError("File model.tar.gz not found")

os.system("mkdir -p {}".format(local_checkpoint_dir))
os.system("tar -xvzf {}/model.tar.gz -C {}".format(tmp_dir, local_checkpoint_dir))

print("Checkpoint directory {}".format(local_checkpoint_dir))
[ ]:
if local_mode:
    checkpoint_path = "file://{}".format(local_checkpoint_dir)
    print("Local checkpoint file path: {}".format(local_checkpoint_dir))
else:
    checkpoint_path = "s3://{}/{}/checkpoint/".format(s3_bucket, job_name)
    if not os.listdir(local_checkpoint_dir):
        raise FileNotFoundError("Checkpoint files not found under the path")
    os.system("aws s3 cp --recursive {} {}".format(local_checkpoint_dir, checkpoint_path))
    print("S3 checkpoint file path: {}".format(checkpoint_path))
[ ]:
%%time

estimator_eval = RLEstimator(
    entry_point="evaluate-ray.py",
    source_dir="src",
    dependencies=["common/sagemaker_rl"],
    toolkit=RLToolkit.RAY,
    framework=RLFramework.TENSORFLOW,
    toolkit_version="1.6.0",
    role=role,
    instance_type=instance_type,
    instance_count=1,
    base_job_name=job_name_prefix + "-evaluation",
    hyperparameters={"evaluate_episodes": 10, "algorithm": "PPO", "env": "CartPole-v1"},
)

estimator_eval.fit({"model": checkpoint_path})
job_name = estimator_eval.latest_training_job.job_name
print("Evaluation job: %s" % job_name)

Model deployment

Now let us deploy the RL policy so that we can get the optimal action, given an environment observation.

Note: Model deployment is supported for TensorFLow only at current stage.

STOP HERE IF PYTORCH IS USED.

[ ]:
from sagemaker.tensorflow.model import TensorFlowModel

model = TensorFlowModel(model_data=estimator.model_data, framework_version="2.5.1", role=role)

predictor = model.deploy(initial_instance_count=1, instance_type=instance_type)
[ ]:
# ray 1.6.0 requires all the following inputs, ray 0.8.5 or below remove 'timestep'
# 'prev_action', 'is_training', 'prev_reward', 'seq_lens' and 'timestep' are placeholders for this example
# they won't affect prediction results

# Number of different values stored in at any time in the current state for the Cartpole example.
CARTPOLE_STATE_VALUES = 4

input = {
    "inputs": {
        "observations": np.ones(shape=(1, CARTPOLE_STATE_VALUES)).tolist(),
        "prev_action": [0, 0],
        "is_training": False,
        "prev_reward": -1,
        "seq_lens": -1,
        "timestep": 1,
    }
}
[ ]:
result = predictor.predict(input)

result["outputs"]

Clean up endpoint

[ ]:
predictor.delete_endpoint()
[ ]: