Introduction to MPI on Amazon SageMaker
This notebook’s CI test result for us-west-2 is as follows. CI test results in other regions can be found at the end of the notebook.
Message Passing Interface (MPI) is the fundamental communication protocol for programming parallel computer programs. See its wiki page. Open MPI is the implementation that’s used as a basic building block for distributed training systems.
In Python programs, you can interact with Open MPI APIs via mpi4py and easily convert your single-process python program into a parallel python program.
Parallel processes can exist on one host (e.g. one EC2 instance) or multiple hosts (e.g. many EC2 instances). It’s trivial to set up a parallel cluster (comm world, in MPI parlance) on one host via Open MPI, but it is less straight-forward to set up an MPI comm world across multiple instances.
SageMaker does it for you. In this tutorial, you will go through a few basic (but exceeding important) MPI communications on SageMaker with multiple instances and you will verify that parallel processes across instances are indeed talking to each other. Those basic communications are the fundamental building blocks for distributed training.
Environment
We assume Open MPI and mpi4py have been installed in your environment. This is the case for SageMaker Notebook Instance or Studio.
Inspect the Python Program
[1]:
!pygmentize mpi_demo.py
from mpi4py import MPI
import numpy as np
import time
comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()
if rank == 0:
print("Number of MPI processes that will talk to each other:", size)
def point_to_point():
"""Point to point communication
Send a numpy array (buffer like object) from rank 0 to rank 1
"""
if rank == 0:
print('point to point')
data = np.array([0, 1, 2], dtype=np.intc) # int in C
# remember the difference between
# Upper case API and lower case API
# Basically uppper case API directly calls C API
# so it is fast
# checkout https://mpi4py.readthedocs.io/en/stable/
comm.Send([data, MPI.INT], dest=1)
elif rank == 1:
print(f'Hello I am rank {rank}')
data = np.empty(3, dtype=np.intc)
comm.Recv([data, MPI.INT], source=0)
print('I received some data:', data)
if rank == 0:
time.sleep(1) # give some buffer time for execution to complete
print("="*50)
return
def broadcast():
"""Broadcast a numpy array from rank 0 to others"""
if rank == 0:
print(f'Broadcasting from rank {rank}')
data = np.arange(10, dtype=np.intc)
else:
data = np.empty(10, dtype=np.intc)
comm.Bcast([data, MPI.INT], root=0)
print(f"Data at rank {rank}", data)
if rank ==0:
time.sleep(1)
print("="*50)
return
def gather_reduce_broadcast():
"""Gather numpy arrays from all ranks to rank 0
then take average and broadcast result to other ranks
It is a useful operation in distributed training:
train a model in a few MPI workers with different
input data, then take average weights on rank 0 and
synchroinze weights on other ranks
"""
# stuff to gather at each rank
sendbuf = np.zeros(10, dtype=np.intc) + rank
recvbuf = None
if rank == 0:
print('Gather and reduce')
recvbuf = np.empty([size, 10], dtype=np.intc)
comm.Gather(sendbuf, recvbuf, root=0)
if rank == 0:
print(f"I am rank {rank}, data I gathered is: {recvbuf}")
# take average
# think of it as a prototype of
# average weights, average gradients etc
avg = np.mean(recvbuf, axis=0, dtype=np.float)
else:
# get averaged array from rank 0
# think of it as a prototype of
# synchronizing weights across different MPI procs
avg = np.empty(10, dtype=np.float)
# Note that the data type is float here
# because we took average
comm.Bcast([avg, MPI.FLOAT], root=0)
print(f'I am rank {rank}, my avg is: {avg}')
return
if __name__ == '__main__':
point_to_point()
broadcast()
gather_reduce_broadcast()
See the program in action with 2 parallel processes on your current environment. Make sure you have at least 2 cores.
[2]:
!mpirun -np 2 python mpi_demo.py
Number of MPI processes that will talk to each other: 2
point to point
Hello I am rank 1
I received some data: [0 1 2]
==================================================
Broadcasting from rank 0
Data at rank 0 [0 1 2 3 4 5 6 7 8 9]
Data at rank 1 [0 1 2 3 4 5 6 7 8 9]
==================================================
Gather and reduce
I am rank 0, data I gathered is: [[0 0 0 0 0 0 0 0 0 0]
[1 1 1 1 1 1 1 1 1 1]]
I am rank 0, my avg is: [0.5 0.5 0.5 0.5 0.5 0.5 0.5 0.5 0.5 0.5]
I am rank 1, my avg is: [0.5 0.5 0.5 0.5 0.5 0.5 0.5 0.5 0.5 0.5]
Scale it on SageMaker
You can run the above program with \(n\) processes per host across \(N\) hosts on SageMaker (and get a comm world of size \(n\times N\)). In the remaining of this notebook, you will use SageMaker TensorFlow deep learning container to run the above program. There is no particular reason for the choice, all SageMaker deep learning containers have Open MPI installed. So feel free to replace it with your favorite DLC.
Check out the SageMaker Python SDK Docs for the parameters needed to set up a distributed training job with MPI.
[3]:
import sagemaker
from sagemaker import get_execution_role
from sagemaker.tensorflow import TensorFlow
role = get_execution_role()
# Running 2 processes per host
# if we use 3 instances,
# then we should see 6 MPI processes
distribution = {"mpi": {"enabled": True, "processes_per_host": 2}}
tfest = TensorFlow(
entry_point="mpi_demo.py",
role=role,
framework_version="2.3.0",
distribution=distribution,
py_version="py37",
instance_count=3,
instance_type="ml.c5.2xlarge", # 8 cores
output_path="s3://" + sagemaker.Session().default_bucket() + "/" + "mpi",
)
[4]:
tfest.fit()
2021-05-11 19:56:11 Starting - Starting the training job...
2021-05-11 19:56:35 Starting - Launching requested ML instancesProfilerReport-1620762971: InProgress
......
2021-05-11 19:57:36 Starting - Preparing the instances for training......
2021-05-11 19:58:36 Downloading - Downloading input data...
2021-05-11 19:59:09 Training - Training image download completed. Training in progress..2021-05-11 19:59:14,435 sagemaker-training-toolkit INFO Imported framework sagemaker_tensorflow_container.training
2021-05-11 19:59:14,442 sagemaker-training-toolkit INFO No GPUs detected (normal if no gpus installed)
2021-05-11 19:59:14,937 sagemaker-training-toolkit INFO No GPUs detected (normal if no gpus installed)
2021-05-11 19:59:14,951 sagemaker-training-toolkit INFO No GPUs detected (normal if no gpus installed)
2021-05-11 19:59:14,959 sagemaker-training-toolkit INFO Starting MPI run as worker node.
2021-05-11 19:59:14,959 sagemaker-training-toolkit INFO Waiting for MPI Master to create SSH daemon.
2021-05-11 19:59:14,961 sagemaker-training-toolkit INFO Cannot connect to host algo-1
2021-05-11 19:59:14,961 sagemaker-training-toolkit INFO Connection failed with exception:
[Errno None] Unable to connect to port 22 on 10.0.97.76
2021-05-11 19:59:15,967 paramiko.transport INFO Connected (version 2.0, client OpenSSH_7.6p1)
2021-05-11 19:59:16,062 paramiko.transport INFO Authentication (publickey) successful!
2021-05-11 19:59:16,062 sagemaker-training-toolkit INFO Can connect to host algo-1
2021-05-11 19:59:16,062 sagemaker-training-toolkit INFO MPI Master online, creating SSH daemon.
2021-05-11 19:59:16,062 sagemaker-training-toolkit INFO Writing environment variables to /etc/environment for the MPI process.
2021-05-11 19:59:16,072 sagemaker-training-toolkit INFO Waiting for MPI process to finish.
2021-05-11 19:59:13,928 sagemaker-training-toolkit INFO Imported framework sagemaker_tensorflow_container.training
2021-05-11 19:59:13,935 sagemaker-training-toolkit INFO No GPUs detected (normal if no gpus installed)
2021-05-11 19:59:14,398 sagemaker-training-toolkit INFO No GPUs detected (normal if no gpus installed)
2021-05-11 19:59:14,413 sagemaker-training-toolkit INFO No GPUs detected (normal if no gpus installed)
2021-05-11 19:59:14,421 sagemaker-training-toolkit INFO Starting MPI run as worker node.
2021-05-11 19:59:14,422 sagemaker-training-toolkit INFO Waiting for MPI Master to create SSH daemon.
2021-05-11 19:59:14,423 sagemaker-training-toolkit INFO Cannot connect to host algo-1
2021-05-11 19:59:14,423 sagemaker-training-toolkit INFO Connection failed with exception:
[Errno None] Unable to connect to port 22 on 10.0.97.76
2021-05-11 19:59:15,430 paramiko.transport INFO Connected (version 2.0, client OpenSSH_7.6p1)
2021-05-11 19:59:15,541 paramiko.transport INFO Authentication (publickey) successful!
2021-05-11 19:59:15,542 sagemaker-training-toolkit INFO Can connect to host algo-1
2021-05-11 19:59:15,542 sagemaker-training-toolkit INFO MPI Master online, creating SSH daemon.
2021-05-11 19:59:15,542 sagemaker-training-toolkit INFO Writing environment variables to /etc/environment for the MPI process.
2021-05-11 19:59:15,553 sagemaker-training-toolkit INFO Waiting for MPI process to finish.
2021-05-11 19:59:14,525 sagemaker-training-toolkit INFO Imported framework sagemaker_tensorflow_container.training
2021-05-11 19:59:14,532 sagemaker-training-toolkit INFO No GPUs detected (normal if no gpus installed)
2021-05-11 19:59:15,051 sagemaker-training-toolkit INFO No GPUs detected (normal if no gpus installed)
2021-05-11 19:59:15,065 sagemaker-training-toolkit INFO No GPUs detected (normal if no gpus installed)
2021-05-11 19:59:15,074 sagemaker-training-toolkit INFO Starting MPI run as worker node.
2021-05-11 19:59:15,074 sagemaker-training-toolkit INFO Creating SSH daemon.
2021-05-11 19:59:15,080 sagemaker-training-toolkit INFO Waiting for MPI workers to establish their SSH connections
2021-05-11 19:59:15,082 sagemaker-training-toolkit INFO Cannot connect to host algo-2
2021-05-11 19:59:15,082 sagemaker-training-toolkit INFO Connection failed with exception:
[Errno None] Unable to connect to port 22 on 10.0.76.118
2021-05-11 19:59:16,089 paramiko.transport INFO Connected (version 2.0, client OpenSSH_7.6p1)
2021-05-11 19:59:16,198 paramiko.transport INFO Authentication (publickey) successful!
2021-05-11 19:59:16,198 sagemaker-training-toolkit INFO Can connect to host algo-2
2021-05-11 19:59:16,198 sagemaker-training-toolkit INFO Worker algo-2 available for communication
2021-05-11 19:59:16,204 paramiko.transport INFO Connected (version 2.0, client OpenSSH_7.6p1)
2021-05-11 19:59:16,315 paramiko.transport INFO Authentication (publickey) successful!
2021-05-11 19:59:16,315 sagemaker-training-toolkit INFO Can connect to host algo-3
2021-05-11 19:59:16,316 sagemaker-training-toolkit INFO Worker algo-3 available for communication
2021-05-11 19:59:16,316 sagemaker-training-toolkit INFO Env Hosts: ['algo-1', 'algo-2', 'algo-3'] Hosts: ['algo-1:2', 'algo-2:2', 'algo-3:2'] process_per_hosts: 2 num_processes: 6
2021-05-11 19:59:16,317 sagemaker-training-toolkit INFO Network interface name: eth0
2021-05-11 19:59:16,323 sagemaker-training-toolkit INFO No GPUs detected (normal if no gpus installed)
2021-05-11 19:59:16,332 sagemaker-training-toolkit INFO Invoking user script
Training Env:
{
"additional_framework_parameters": {
"sagemaker_mpi_num_of_processes_per_host": 2,
"sagemaker_mpi_custom_mpi_options": "",
"sagemaker_mpi_enabled": true
},
"channel_input_dirs": {},
"current_host": "algo-1",
"framework_module": "sagemaker_tensorflow_container.training:main",
"hosts": [
"algo-1",
"algo-2",
"algo-3"
],
"hyperparameters": {
"model_dir": "/opt/ml/model"
},
"input_config_dir": "/opt/ml/input/config",
"input_data_config": {},
"input_dir": "/opt/ml/input",
"is_master": true,
"job_name": "tensorflow-training-2021-05-11-19-56-11-281",
"log_level": 20,
"master_hostname": "algo-1",
"model_dir": "/opt/ml/model",
"module_dir": "s3://sagemaker-us-west-2-688520471316/tensorflow-training-2021-05-11-19-56-11-281/source/sourcedir.tar.gz",
"module_name": "mpi_demo",
"network_interface_name": "eth0",
"num_cpus": 8,
"num_gpus": 0,
"output_data_dir": "/opt/ml/output/data",
"output_dir": "/opt/ml/output",
"output_intermediate_dir": "/opt/ml/output/intermediate",
"resource_config": {
"current_host": "algo-1",
"hosts": [
"algo-1",
"algo-2",
"algo-3"
],
"network_interface_name": "eth0"
},
"user_entry_point": "mpi_demo.py"
}
Environment variables:
SM_HOSTS=["algo-1","algo-2","algo-3"]
SM_NETWORK_INTERFACE_NAME=eth0
SM_HPS={"model_dir":"/opt/ml/model"}
SM_USER_ENTRY_POINT=mpi_demo.py
SM_FRAMEWORK_PARAMS={"sagemaker_mpi_custom_mpi_options":"","sagemaker_mpi_enabled":true,"sagemaker_mpi_num_of_processes_per_host":2}
SM_RESOURCE_CONFIG={"current_host":"algo-1","hosts":["algo-1","algo-2","algo-3"],"network_interface_name":"eth0"}
SM_INPUT_DATA_CONFIG={}
SM_OUTPUT_DATA_DIR=/opt/ml/output/data
SM_CHANNELS=[]
SM_CURRENT_HOST=algo-1
SM_MODULE_NAME=mpi_demo
SM_LOG_LEVEL=20
SM_FRAMEWORK_MODULE=sagemaker_tensorflow_container.training:main
SM_INPUT_DIR=/opt/ml/input
SM_INPUT_CONFIG_DIR=/opt/ml/input/config
SM_OUTPUT_DIR=/opt/ml/output
SM_NUM_CPUS=8
SM_NUM_GPUS=0
SM_MODEL_DIR=/opt/ml/model
SM_MODULE_DIR=s3://sagemaker-us-west-2-688520471316/tensorflow-training-2021-05-11-19-56-11-281/source/sourcedir.tar.gz
SM_TRAINING_ENV={"additional_framework_parameters":{"sagemaker_mpi_custom_mpi_options":"","sagemaker_mpi_enabled":true,"sagemaker_mpi_num_of_processes_per_host":2},"channel_input_dirs":{},"current_host":"algo-1","framework_module":"sagemaker_tensorflow_container.training:main","hosts":["algo-1","algo-2","algo-3"],"hyperparameters":{"model_dir":"/opt/ml/model"},"input_config_dir":"/opt/ml/input/config","input_data_config":{},"input_dir":"/opt/ml/input","is_master":true,"job_name":"tensorflow-training-2021-05-11-19-56-11-281","log_level":20,"master_hostname":"algo-1","model_dir":"/opt/ml/model","module_dir":"s3://sagemaker-us-west-2-688520471316/tensorflow-training-2021-05-11-19-56-11-281/source/sourcedir.tar.gz","module_name":"mpi_demo","network_interface_name":"eth0","num_cpus":8,"num_gpus":0,"output_data_dir":"/opt/ml/output/data","output_dir":"/opt/ml/output","output_intermediate_dir":"/opt/ml/output/intermediate","resource_config":{"current_host":"algo-1","hosts":["algo-1","algo-2","algo-3"],"network_interface_name":"eth0"},"user_entry_point":"mpi_demo.py"}
SM_USER_ARGS=["--model_dir","/opt/ml/model"]
SM_OUTPUT_INTERMEDIATE_DIR=/opt/ml/output/intermediate
SM_HP_MODEL_DIR=/opt/ml/model
PYTHONPATH=/opt/ml/code:/usr/local/bin:/usr/local/lib/python37.zip:/usr/local/lib/python3.7:/usr/local/lib/python3.7/lib-dynload:/usr/local/lib/python3.7/site-packages
Invoking script with the following command:
mpirun --host algo-1:2,algo-2:2,algo-3:2 -np 6 --allow-run-as-root --display-map --tag-output -mca btl_tcp_if_include eth0 -mca oob_tcp_if_include eth0 -mca plm_rsh_no_tree_spawn 1 -bind-to none -map-by slot -mca pml ob1 -mca btl ^openib -mca orte_abort_on_non_zero_status 1 -x NCCL_MIN_NRINGS=4 -x NCCL_SOCKET_IFNAME=eth0 -x NCCL_DEBUG=INFO -x LD_LIBRARY_PATH -x PATH -x LD_PRELOAD=/usr/local/lib/python3.7/site-packages/gethostname.cpython-37m-x86_64-linux-gnu.so -x SM_HOSTS -x SM_NETWORK_INTERFACE_NAME -x SM_HPS -x SM_USER_ENTRY_POINT -x SM_FRAMEWORK_PARAMS -x SM_RESOURCE_CONFIG -x SM_INPUT_DATA_CONFIG -x SM_OUTPUT_DATA_DIR -x SM_CHANNELS -x SM_CURRENT_HOST -x SM_MODULE_NAME -x SM_LOG_LEVEL -x SM_FRAMEWORK_MODULE -x SM_INPUT_DIR -x SM_INPUT_CONFIG_DIR -x SM_OUTPUT_DIR -x SM_NUM_CPUS -x SM_NUM_GPUS -x SM_MODEL_DIR -x SM_MODULE_DIR -x SM_TRAINING_ENV -x SM_USER_ARGS -x SM_OUTPUT_INTERMEDIATE_DIR -x SM_HP_MODEL_DIR -x PYTHONPATH /usr/local/bin/python3.7 -m mpi4py mpi_demo.py --model_dir /opt/ml/model
Data for JOB [44607,1] offset 0 Total slots allocated 6
======================== JOB MAP ========================
Data for node: ip-10-0-97-76#011Num slots: 2#011Max slots: 0#011Num procs: 2
#011Process OMPI jobid: [44607,1] App: 0 Process rank: 0 Bound: N/A
#011Process OMPI jobid: [44607,1] App: 0 Process rank: 1 Bound: N/A
Data for node: algo-2#011Num slots: 2#011Max slots: 0#011Num procs: 2
#011Process OMPI jobid: [44607,1] App: 0 Process rank: 2 Bound: N/A
#011Process OMPI jobid: [44607,1] App: 0 Process rank: 3 Bound: N/A
Data for node: algo-3#011Num slots: 2#011Max slots: 0#011Num procs: 2
#011Process OMPI jobid: [44607,1] App: 0 Process rank: 4 Bound: N/A
#011Process OMPI jobid: [44607,1] App: 0 Process rank: 5 Bound: N/A
=============================================================
[1,1]<stdout>:Hello I am rank 1
[1,0]<stdout>:Number of MPI processes that will talk to each other: 6
[1,0]<stdout>:point to point
[1,1]<stdout>:I received some data: [1,1]<stdout>:[0 1 2]
[1,0]<stdout>:==================================================
[1,0]<stdout>:Broadcasting from rank 0
[1,3]<stdout>:Data at rank 3 [1,2]<stdout>:Data at rank 2 [1,2]<stdout>:[0 1 2 3 4 5 6 7 8 9]
[1,3]<stdout>:[0 1 2 3 4 5 6 7 8 9]
[1,1]<stdout>:Data at rank 1 [1,0]<stdout>:Data at rank 0 [1,1]<stdout>:[0 1 2 3 4 5 6 7 8 9]
[1,0]<stdout>:[0 1 2 3 4 5 6 7 8 9]
[1,4]<stdout>:Data at rank 4 [1,5]<stdout>:Data at rank 5 [1,4]<stdout>:[0 1 2 3 4 5 6 7 8 9]
[1,5]<stdout>:[0 1 2 3 4 5 6 7 8 9]
[1,0]<stdout>:==================================================
[1,0]<stdout>:Gather and reduce
[1,0]<stdout>:I am rank 0, data I gathered is: [[0 0 0 0 0 0 0 0 0 0]
[1,0]<stdout>: [1 1 1 1 1 1 1 1 1 1]
[1,0]<stdout>: [2 2 2 2 2 2 2 2 2 2]
[1,0]<stdout>: [3 3 3 3 3 3 3 3 3 3]
[1,0]<stdout>: [4 4 4 4 4 4 4 4 4 4]
[1,0]<stdout>: [5 5 5 5 5 5 5 5 5 5]]
[1,0]<stdout>:I am rank 0, my avg is: [2.5 2.5 2.5 2.5 2.5 2.5 2.5 2.5 2.5 2.5]
[1,1]<stdout>:I am rank 1, my avg is: [2.5 2.5 2.5 2.5 2.5 2.5 2.5 2.5 2.5 2.5]
[1,4]<stdout>:I am rank 4, my avg is: [2.5 2.5 2.5 2.5 2.5 2.5 2.5 2.5 2.5 2.5]
[1,5]<stdout>:I am rank 5, my avg is: [2.5 2.5 2.5 2.5 2.5 2.5 2.5 2.5 2.5 2.5]
[1,2]<stdout>:I am rank 2, my avg is: [2.5 2.5 2.5 2.5 2.5 2.5 2.5 2.5 2.5 2.5]
[1,3]<stdout>:I am rank 3, my avg is: [2.5 2.5 2.5 2.5 2.5 2.5 2.5 2.5 2.5 2.5]
2021-05-11 19:59:19,754 sagemaker_tensorflow_container.training WARNING No model artifact is saved under path /opt/ml/model. Your training job will not save any model files to S3.
For details of how to construct your training script see:
https://sagemaker.readthedocs.io/en/stable/using_tf.html#adapting-your-local-tensorflow-script
2021-05-11 19:59:19,754 sagemaker-training-toolkit INFO Reporting training SUCCESS
2021-05-11 19:59:49,799 sagemaker-training-toolkit INFO MPI process finished.
2021-05-11 19:59:49,800 sagemaker_tensorflow_container.training WARNING No model artifact is saved under path /opt/ml/model. Your training job will not save any model files to S3.
For details of how to construct your training script see:
https://sagemaker.readthedocs.io/en/stable/using_tf.html#adapting-your-local-tensorflow-script
2021-05-11 19:59:49,800 sagemaker-training-toolkit INFO Reporting training SUCCESS
2021-05-11 19:59:49,782 sagemaker-training-toolkit INFO MPI process finished.
2021-05-11 19:59:49,783 sagemaker_tensorflow_container.training WARNING No model artifact is saved under path /opt/ml/model. Your training job will not save any model files to S3.
For details of how to construct your training script see:
https://sagemaker.readthedocs.io/en/stable/using_tf.html#adapting-your-local-tensorflow-script
2021-05-11 19:59:49,783 sagemaker-training-toolkit INFO Reporting training SUCCESS
2021-05-11 19:59:58 Uploading - Uploading generated training model
2021-05-11 19:59:58 Completed - Training job completed
Training seconds: 270
Billable seconds: 270
The stdout “Number of MPI processes that will talk to each other: 6” indicates that the processes on all hosts are included in the comm world.
Conclusion
In this notebook, you went through some fundamental MPI operations, which are the bare bones of inner workings of many distributed training frameworks. You did that on SageMaker with multiple instances. You can scale up this set up to include more instances in a real ML project.
Notebook CI Test Results
This notebook was tested in multiple regions. The test results are as follows, except for us-west-2 which is shown at the top of the notebook.