Introduction to MPI on Amazon SageMaker

Message Passing Interface (MPI) is the fundamental communication protocol for programming parallel computer programs. See its wiki page. Open MPI is the implementation that’s used as a basic building block for distributed training systems.

In Python programs, you can interact with Open MPI APIs via mpi4py and easily convert your single-process python program into a parallel python program.

Parallel processes can exist on one host (e.g. one EC2 instance) or multiple hosts (e.g. many EC2 instances). It’s trivial to set up a parallel cluster (comm world, in MPI parlance) on one host via Open MPI, but it is less straight-forward to set up an MPI comm world across multiple instances.

SageMaker does it for you. In this tutorial, you will go through a few basic (but exceeding important) MPI communications on SageMaker with multiple instances and you will verify that parallel processes across instances are indeed talking to each other. Those basic communications are the fundamental building blocks for distributed training.

Environment

We assume Open MPI and mpi4py have been installed in your environment. This is the case for SageMaker Notebook Instance or Studio.

Inspect the Python Program

[1]:
!pygmentize mpi_demo.py
from mpi4py import MPI
import numpy as np
import time

comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()

if rank == 0:
    print("Number of MPI processes that will talk to each other:", size)


def point_to_point():
    """Point to point communication
    Send a numpy array (buffer like object) from rank 0 to rank 1
    """
    if rank == 0:
        print('point to point')
        data = np.array([0, 1, 2], dtype=np.intc) # int in C

        # remember the difference between
        # Upper case API and lower case API
        # Basically uppper case API directly calls C API
        # so it is fast
        # checkout https://mpi4py.readthedocs.io/en/stable/

        comm.Send([data, MPI.INT], dest=1)
    elif rank == 1:
        print(f'Hello I am rank {rank}')
        data = np.empty(3, dtype=np.intc)
        comm.Recv([data, MPI.INT], source=0)
        print('I received some data:', data)

    if rank == 0:
        time.sleep(1) # give some buffer time for execution to complete
        print("="*50)
    return


def broadcast():
    """Broadcast a numpy array from rank 0 to others"""

    if rank == 0:
        print(f'Broadcasting from rank {rank}')
        data = np.arange(10, dtype=np.intc)
    else:
        data = np.empty(10, dtype=np.intc)

    comm.Bcast([data, MPI.INT], root=0)
    print(f"Data at rank {rank}", data)

    if rank ==0:
        time.sleep(1)
        print("="*50)
    return


def gather_reduce_broadcast():
    """Gather numpy arrays from all ranks to rank 0
    then take average and broadcast result to other ranks
    
    It is a useful operation in distributed training:
    train a model in a few MPI workers with different 
    input data, then take average weights on rank 0 and 
    synchroinze weights on other ranks
    """

    # stuff to gather at each rank
    sendbuf = np.zeros(10, dtype=np.intc) + rank
    recvbuf = None

    if rank == 0:
        print('Gather and reduce')
        recvbuf = np.empty([size, 10], dtype=np.intc)
    comm.Gather(sendbuf, recvbuf, root=0)

    if rank == 0:
        print(f"I am rank {rank}, data I gathered is: {recvbuf}")

        # take average
        # think of it as a prototype of
        # average weights, average gradients etc
        avg = np.mean(recvbuf, axis=0, dtype=np.float)

    else:
        # get averaged array from rank 0
        # think of it as a prototype of
        # synchronizing weights across different MPI procs
        avg = np.empty(10, dtype=np.float)

    # Note that the data type is float here
    # because we took average 
    comm.Bcast([avg, MPI.FLOAT], root=0)

    print(f'I am rank {rank}, my avg is: {avg}')
    return


if __name__ == '__main__':
    point_to_point()
    broadcast()
    gather_reduce_broadcast()

See the program in action with 2 parallel processes on your current environment. Make sure you have at least 2 cores.

[2]:
!mpirun -np 2 python mpi_demo.py
Number of MPI processes that will talk to each other:  2
point to point
Hello I am rank 1
I received some data: [0 1 2]
==================================================
Broadcasting from rank 0
Data at rank 0 [0 1 2 3 4 5 6 7 8 9]
Data at rank 1 [0 1 2 3 4 5 6 7 8 9]
==================================================
Gather and reduce
I am rank 0, data I gathered is: [[0 0 0 0 0 0 0 0 0 0]
 [1 1 1 1 1 1 1 1 1 1]]
I am rank 0, my avg is: [0.5 0.5 0.5 0.5 0.5 0.5 0.5 0.5 0.5 0.5]
I am rank 1, my avg is: [0.5 0.5 0.5 0.5 0.5 0.5 0.5 0.5 0.5 0.5]

Scale it on SageMaker

You can run the above program with \(n\) processes per host across \(N\) hosts on SageMaker (and get a comm world of size \(n\times N\)). In the remaining of this notebook, you will use SageMaker TensorFlow deep learning container to run the above program. There is no particular reason for the choice, all SageMaker deep learning containers have Open MPI installed. So feel free to replace it with your favorite DLC.

Check out the SageMaker Python SDK Docs for the parameters needed to set up a distributed training job with MPI.

[3]:
import sagemaker
from sagemaker import get_execution_role
from sagemaker.tensorflow import TensorFlow

role = get_execution_role()

# Running 2 processes per host
# if we use 3 instances,
# then we should see 6 MPI processes

distribution = {"mpi": {"enabled": True, "processes_per_host": 2}}

tfest = TensorFlow(
    entry_point="mpi_demo.py",
    role=role,
    framework_version="2.3.0",
    distribution=distribution,
    py_version="py37",
    instance_count=3,
    instance_type="ml.c5.2xlarge",  # 8 cores
    output_path="s3://" + sagemaker.Session().default_bucket() + "/" + "mpi",
)
[4]:
tfest.fit()
2021-05-11 19:56:11 Starting - Starting the training job...
2021-05-11 19:56:35 Starting - Launching requested ML instancesProfilerReport-1620762971: InProgress
......
2021-05-11 19:57:36 Starting - Preparing the instances for training......
2021-05-11 19:58:36 Downloading - Downloading input data...
2021-05-11 19:59:09 Training - Training image download completed. Training in progress..2021-05-11 19:59:14,435 sagemaker-training-toolkit INFO     Imported framework sagemaker_tensorflow_container.training
2021-05-11 19:59:14,442 sagemaker-training-toolkit INFO     No GPUs detected (normal if no gpus installed)
2021-05-11 19:59:14,937 sagemaker-training-toolkit INFO     No GPUs detected (normal if no gpus installed)
2021-05-11 19:59:14,951 sagemaker-training-toolkit INFO     No GPUs detected (normal if no gpus installed)
2021-05-11 19:59:14,959 sagemaker-training-toolkit INFO     Starting MPI run as worker node.
2021-05-11 19:59:14,959 sagemaker-training-toolkit INFO     Waiting for MPI Master to create SSH daemon.
2021-05-11 19:59:14,961 sagemaker-training-toolkit INFO     Cannot connect to host algo-1
2021-05-11 19:59:14,961 sagemaker-training-toolkit INFO     Connection failed with exception:
 [Errno None] Unable to connect to port 22 on 10.0.97.76
2021-05-11 19:59:15,967 paramiko.transport INFO     Connected (version 2.0, client OpenSSH_7.6p1)
2021-05-11 19:59:16,062 paramiko.transport INFO     Authentication (publickey) successful!
2021-05-11 19:59:16,062 sagemaker-training-toolkit INFO     Can connect to host algo-1
2021-05-11 19:59:16,062 sagemaker-training-toolkit INFO     MPI Master online, creating SSH daemon.
2021-05-11 19:59:16,062 sagemaker-training-toolkit INFO     Writing environment variables to /etc/environment for the MPI process.
2021-05-11 19:59:16,072 sagemaker-training-toolkit INFO     Waiting for MPI process to finish.
2021-05-11 19:59:13,928 sagemaker-training-toolkit INFO     Imported framework sagemaker_tensorflow_container.training
2021-05-11 19:59:13,935 sagemaker-training-toolkit INFO     No GPUs detected (normal if no gpus installed)
2021-05-11 19:59:14,398 sagemaker-training-toolkit INFO     No GPUs detected (normal if no gpus installed)
2021-05-11 19:59:14,413 sagemaker-training-toolkit INFO     No GPUs detected (normal if no gpus installed)
2021-05-11 19:59:14,421 sagemaker-training-toolkit INFO     Starting MPI run as worker node.
2021-05-11 19:59:14,422 sagemaker-training-toolkit INFO     Waiting for MPI Master to create SSH daemon.
2021-05-11 19:59:14,423 sagemaker-training-toolkit INFO     Cannot connect to host algo-1
2021-05-11 19:59:14,423 sagemaker-training-toolkit INFO     Connection failed with exception:
 [Errno None] Unable to connect to port 22 on 10.0.97.76
2021-05-11 19:59:15,430 paramiko.transport INFO     Connected (version 2.0, client OpenSSH_7.6p1)
2021-05-11 19:59:15,541 paramiko.transport INFO     Authentication (publickey) successful!
2021-05-11 19:59:15,542 sagemaker-training-toolkit INFO     Can connect to host algo-1
2021-05-11 19:59:15,542 sagemaker-training-toolkit INFO     MPI Master online, creating SSH daemon.
2021-05-11 19:59:15,542 sagemaker-training-toolkit INFO     Writing environment variables to /etc/environment for the MPI process.
2021-05-11 19:59:15,553 sagemaker-training-toolkit INFO     Waiting for MPI process to finish.
2021-05-11 19:59:14,525 sagemaker-training-toolkit INFO     Imported framework sagemaker_tensorflow_container.training
2021-05-11 19:59:14,532 sagemaker-training-toolkit INFO     No GPUs detected (normal if no gpus installed)
2021-05-11 19:59:15,051 sagemaker-training-toolkit INFO     No GPUs detected (normal if no gpus installed)
2021-05-11 19:59:15,065 sagemaker-training-toolkit INFO     No GPUs detected (normal if no gpus installed)
2021-05-11 19:59:15,074 sagemaker-training-toolkit INFO     Starting MPI run as worker node.
2021-05-11 19:59:15,074 sagemaker-training-toolkit INFO     Creating SSH daemon.
2021-05-11 19:59:15,080 sagemaker-training-toolkit INFO     Waiting for MPI workers to establish their SSH connections
2021-05-11 19:59:15,082 sagemaker-training-toolkit INFO     Cannot connect to host algo-2
2021-05-11 19:59:15,082 sagemaker-training-toolkit INFO     Connection failed with exception:
 [Errno None] Unable to connect to port 22 on 10.0.76.118
2021-05-11 19:59:16,089 paramiko.transport INFO     Connected (version 2.0, client OpenSSH_7.6p1)
2021-05-11 19:59:16,198 paramiko.transport INFO     Authentication (publickey) successful!
2021-05-11 19:59:16,198 sagemaker-training-toolkit INFO     Can connect to host algo-2
2021-05-11 19:59:16,198 sagemaker-training-toolkit INFO     Worker algo-2 available for communication
2021-05-11 19:59:16,204 paramiko.transport INFO     Connected (version 2.0, client OpenSSH_7.6p1)
2021-05-11 19:59:16,315 paramiko.transport INFO     Authentication (publickey) successful!
2021-05-11 19:59:16,315 sagemaker-training-toolkit INFO     Can connect to host algo-3
2021-05-11 19:59:16,316 sagemaker-training-toolkit INFO     Worker algo-3 available for communication
2021-05-11 19:59:16,316 sagemaker-training-toolkit INFO     Env Hosts: ['algo-1', 'algo-2', 'algo-3'] Hosts: ['algo-1:2', 'algo-2:2', 'algo-3:2'] process_per_hosts: 2 num_processes: 6
2021-05-11 19:59:16,317 sagemaker-training-toolkit INFO     Network interface name: eth0
2021-05-11 19:59:16,323 sagemaker-training-toolkit INFO     No GPUs detected (normal if no gpus installed)
2021-05-11 19:59:16,332 sagemaker-training-toolkit INFO     Invoking user script

Training Env:

{
    "additional_framework_parameters": {
        "sagemaker_mpi_num_of_processes_per_host": 2,
        "sagemaker_mpi_custom_mpi_options": "",
        "sagemaker_mpi_enabled": true
    },
    "channel_input_dirs": {},
    "current_host": "algo-1",
    "framework_module": "sagemaker_tensorflow_container.training:main",
    "hosts": [
        "algo-1",
        "algo-2",
        "algo-3"
    ],
    "hyperparameters": {
        "model_dir": "/opt/ml/model"
    },
    "input_config_dir": "/opt/ml/input/config",
    "input_data_config": {},
    "input_dir": "/opt/ml/input",
    "is_master": true,
    "job_name": "tensorflow-training-2021-05-11-19-56-11-281",
    "log_level": 20,
    "master_hostname": "algo-1",
    "model_dir": "/opt/ml/model",
    "module_dir": "s3://sagemaker-us-west-2-688520471316/tensorflow-training-2021-05-11-19-56-11-281/source/sourcedir.tar.gz",
    "module_name": "mpi_demo",
    "network_interface_name": "eth0",
    "num_cpus": 8,
    "num_gpus": 0,
    "output_data_dir": "/opt/ml/output/data",
    "output_dir": "/opt/ml/output",
    "output_intermediate_dir": "/opt/ml/output/intermediate",
    "resource_config": {
        "current_host": "algo-1",
        "hosts": [
            "algo-1",
            "algo-2",
            "algo-3"
        ],
        "network_interface_name": "eth0"
    },
    "user_entry_point": "mpi_demo.py"
}

Environment variables:

SM_HOSTS=["algo-1","algo-2","algo-3"]
SM_NETWORK_INTERFACE_NAME=eth0
SM_HPS={"model_dir":"/opt/ml/model"}
SM_USER_ENTRY_POINT=mpi_demo.py
SM_FRAMEWORK_PARAMS={"sagemaker_mpi_custom_mpi_options":"","sagemaker_mpi_enabled":true,"sagemaker_mpi_num_of_processes_per_host":2}
SM_RESOURCE_CONFIG={"current_host":"algo-1","hosts":["algo-1","algo-2","algo-3"],"network_interface_name":"eth0"}
SM_INPUT_DATA_CONFIG={}
SM_OUTPUT_DATA_DIR=/opt/ml/output/data
SM_CHANNELS=[]
SM_CURRENT_HOST=algo-1
SM_MODULE_NAME=mpi_demo
SM_LOG_LEVEL=20
SM_FRAMEWORK_MODULE=sagemaker_tensorflow_container.training:main
SM_INPUT_DIR=/opt/ml/input
SM_INPUT_CONFIG_DIR=/opt/ml/input/config
SM_OUTPUT_DIR=/opt/ml/output
SM_NUM_CPUS=8
SM_NUM_GPUS=0
SM_MODEL_DIR=/opt/ml/model
SM_MODULE_DIR=s3://sagemaker-us-west-2-688520471316/tensorflow-training-2021-05-11-19-56-11-281/source/sourcedir.tar.gz
SM_TRAINING_ENV={"additional_framework_parameters":{"sagemaker_mpi_custom_mpi_options":"","sagemaker_mpi_enabled":true,"sagemaker_mpi_num_of_processes_per_host":2},"channel_input_dirs":{},"current_host":"algo-1","framework_module":"sagemaker_tensorflow_container.training:main","hosts":["algo-1","algo-2","algo-3"],"hyperparameters":{"model_dir":"/opt/ml/model"},"input_config_dir":"/opt/ml/input/config","input_data_config":{},"input_dir":"/opt/ml/input","is_master":true,"job_name":"tensorflow-training-2021-05-11-19-56-11-281","log_level":20,"master_hostname":"algo-1","model_dir":"/opt/ml/model","module_dir":"s3://sagemaker-us-west-2-688520471316/tensorflow-training-2021-05-11-19-56-11-281/source/sourcedir.tar.gz","module_name":"mpi_demo","network_interface_name":"eth0","num_cpus":8,"num_gpus":0,"output_data_dir":"/opt/ml/output/data","output_dir":"/opt/ml/output","output_intermediate_dir":"/opt/ml/output/intermediate","resource_config":{"current_host":"algo-1","hosts":["algo-1","algo-2","algo-3"],"network_interface_name":"eth0"},"user_entry_point":"mpi_demo.py"}
SM_USER_ARGS=["--model_dir","/opt/ml/model"]
SM_OUTPUT_INTERMEDIATE_DIR=/opt/ml/output/intermediate
SM_HP_MODEL_DIR=/opt/ml/model
PYTHONPATH=/opt/ml/code:/usr/local/bin:/usr/local/lib/python37.zip:/usr/local/lib/python3.7:/usr/local/lib/python3.7/lib-dynload:/usr/local/lib/python3.7/site-packages

Invoking script with the following command:

mpirun --host algo-1:2,algo-2:2,algo-3:2 -np 6 --allow-run-as-root --display-map --tag-output -mca btl_tcp_if_include eth0 -mca oob_tcp_if_include eth0 -mca plm_rsh_no_tree_spawn 1 -bind-to none -map-by slot -mca pml ob1 -mca btl ^openib -mca orte_abort_on_non_zero_status 1 -x NCCL_MIN_NRINGS=4 -x NCCL_SOCKET_IFNAME=eth0 -x NCCL_DEBUG=INFO -x LD_LIBRARY_PATH -x PATH -x LD_PRELOAD=/usr/local/lib/python3.7/site-packages/gethostname.cpython-37m-x86_64-linux-gnu.so -x SM_HOSTS -x SM_NETWORK_INTERFACE_NAME -x SM_HPS -x SM_USER_ENTRY_POINT -x SM_FRAMEWORK_PARAMS -x SM_RESOURCE_CONFIG -x SM_INPUT_DATA_CONFIG -x SM_OUTPUT_DATA_DIR -x SM_CHANNELS -x SM_CURRENT_HOST -x SM_MODULE_NAME -x SM_LOG_LEVEL -x SM_FRAMEWORK_MODULE -x SM_INPUT_DIR -x SM_INPUT_CONFIG_DIR -x SM_OUTPUT_DIR -x SM_NUM_CPUS -x SM_NUM_GPUS -x SM_MODEL_DIR -x SM_MODULE_DIR -x SM_TRAINING_ENV -x SM_USER_ARGS -x SM_OUTPUT_INTERMEDIATE_DIR -x SM_HP_MODEL_DIR -x PYTHONPATH /usr/local/bin/python3.7 -m mpi4py mpi_demo.py --model_dir /opt/ml/model


 Data for JOB [44607,1] offset 0 Total slots allocated 6

 ========================   JOB MAP   ========================

 Data for node: ip-10-0-97-76#011Num slots: 2#011Max slots: 0#011Num procs: 2
 #011Process OMPI jobid: [44607,1] App: 0 Process rank: 0 Bound: N/A
 #011Process OMPI jobid: [44607,1] App: 0 Process rank: 1 Bound: N/A

 Data for node: algo-2#011Num slots: 2#011Max slots: 0#011Num procs: 2
 #011Process OMPI jobid: [44607,1] App: 0 Process rank: 2 Bound: N/A
 #011Process OMPI jobid: [44607,1] App: 0 Process rank: 3 Bound: N/A

 Data for node: algo-3#011Num slots: 2#011Max slots: 0#011Num procs: 2
 #011Process OMPI jobid: [44607,1] App: 0 Process rank: 4 Bound: N/A
 #011Process OMPI jobid: [44607,1] App: 0 Process rank: 5 Bound: N/A

 =============================================================
[1,1]<stdout>:Hello I am rank 1
[1,0]<stdout>:Number of MPI processes that will talk to each other:  6
[1,0]<stdout>:point to point
[1,1]<stdout>:I received some data: [1,1]<stdout>:[0 1 2]
[1,0]<stdout>:==================================================
[1,0]<stdout>:Broadcasting from rank 0
[1,3]<stdout>:Data at rank 3 [1,2]<stdout>:Data at rank 2 [1,2]<stdout>:[0 1 2 3 4 5 6 7 8 9]
[1,3]<stdout>:[0 1 2 3 4 5 6 7 8 9]
[1,1]<stdout>:Data at rank 1 [1,0]<stdout>:Data at rank 0 [1,1]<stdout>:[0 1 2 3 4 5 6 7 8 9]
[1,0]<stdout>:[0 1 2 3 4 5 6 7 8 9]
[1,4]<stdout>:Data at rank 4 [1,5]<stdout>:Data at rank 5 [1,4]<stdout>:[0 1 2 3 4 5 6 7 8 9]
[1,5]<stdout>:[0 1 2 3 4 5 6 7 8 9]
[1,0]<stdout>:==================================================
[1,0]<stdout>:Gather and reduce
[1,0]<stdout>:I am rank 0, data I gathered is: [[0 0 0 0 0 0 0 0 0 0]
[1,0]<stdout>: [1 1 1 1 1 1 1 1 1 1]
[1,0]<stdout>: [2 2 2 2 2 2 2 2 2 2]
[1,0]<stdout>: [3 3 3 3 3 3 3 3 3 3]
[1,0]<stdout>: [4 4 4 4 4 4 4 4 4 4]
[1,0]<stdout>: [5 5 5 5 5 5 5 5 5 5]]
[1,0]<stdout>:I am rank 0, my avg is: [2.5 2.5 2.5 2.5 2.5 2.5 2.5 2.5 2.5 2.5]
[1,1]<stdout>:I am rank 1, my avg is: [2.5 2.5 2.5 2.5 2.5 2.5 2.5 2.5 2.5 2.5]
[1,4]<stdout>:I am rank 4, my avg is: [2.5 2.5 2.5 2.5 2.5 2.5 2.5 2.5 2.5 2.5]
[1,5]<stdout>:I am rank 5, my avg is: [2.5 2.5 2.5 2.5 2.5 2.5 2.5 2.5 2.5 2.5]
[1,2]<stdout>:I am rank 2, my avg is: [2.5 2.5 2.5 2.5 2.5 2.5 2.5 2.5 2.5 2.5]
[1,3]<stdout>:I am rank 3, my avg is: [2.5 2.5 2.5 2.5 2.5 2.5 2.5 2.5 2.5 2.5]
2021-05-11 19:59:19,754 sagemaker_tensorflow_container.training WARNING  No model artifact is saved under path /opt/ml/model. Your training job will not save any model files to S3.
For details of how to construct your training script see:
https://sagemaker.readthedocs.io/en/stable/using_tf.html#adapting-your-local-tensorflow-script
2021-05-11 19:59:19,754 sagemaker-training-toolkit INFO     Reporting training SUCCESS
2021-05-11 19:59:49,799 sagemaker-training-toolkit INFO     MPI process finished.
2021-05-11 19:59:49,800 sagemaker_tensorflow_container.training WARNING  No model artifact is saved under path /opt/ml/model. Your training job will not save any model files to S3.
For details of how to construct your training script see:
https://sagemaker.readthedocs.io/en/stable/using_tf.html#adapting-your-local-tensorflow-script
2021-05-11 19:59:49,800 sagemaker-training-toolkit INFO     Reporting training SUCCESS
2021-05-11 19:59:49,782 sagemaker-training-toolkit INFO     MPI process finished.
2021-05-11 19:59:49,783 sagemaker_tensorflow_container.training WARNING  No model artifact is saved under path /opt/ml/model. Your training job will not save any model files to S3.
For details of how to construct your training script see:
https://sagemaker.readthedocs.io/en/stable/using_tf.html#adapting-your-local-tensorflow-script
2021-05-11 19:59:49,783 sagemaker-training-toolkit INFO     Reporting training SUCCESS

2021-05-11 19:59:58 Uploading - Uploading generated training model
2021-05-11 19:59:58 Completed - Training job completed
Training seconds: 270
Billable seconds: 270

The stdout “Number of MPI processes that will talk to each other: 6” indicates that the processes on all hosts are included in the comm world.

Conclusion

In this notebook, you went through some fundamental MPI operations, which are the bare bones of inner workings of many distributed training frameworks. You did that on SageMaker with multiple instances. You can scale up this set up to include more instances in a real ML project.