Ingest Data with EMR

This notebook demonstrates how to read the data from the EMR cluster.

Amazon EMR is the industry-leading cloud big data platform for processing vast amounts of data using open source tools such as Apache Spark, Apache Hive, Apache HBase, Apache Flink, Apache Hudi, and Presto. With EMR you can run Petabyte-scale analysis at less than half of the cost of traditional on-premises solutions and over 3x faster than standard Apache Spark.

Set up Notebook

First, we are going to make sure we have the EMR Cluster set up and the connection between EMR and Sagemaker Notebook set up correctly. You can follow the documentation and procedure to set up this notebook. Once you are done with setting up, restart the kernel and run the following command to check if you set up the EMR and Sagemaker connection correctly.

[ ]:
%%info
[ ]:
%%local
import sagemaker
from sklearn.datasets import *
import pandas as pd

sagemaker_session = sagemaker.Session()
s3 = sagemaker_session.boto_session.resource("s3")
bucket = sagemaker_session.default_bucket()  # replace with your own bucket name if you have one
prefix = "data/tabular/boston_house"
filename = "boston_house.csv"

Download data from online resources and write data to S3

[ ]:
%%local
# helper functions to upload data to s3
def write_to_s3(filename, bucket, prefix):
    # put one file in a separate folder. This is helpful if you read and prepare data with Athena
    filename_key = filename.split(".")[0]
    key = "{}/{}/{}".format(prefix, filename_key, filename)
    return s3.Bucket(bucket).upload_file(filename, key)


def upload_to_s3(bucket, prefix, filename):
    url = "s3://{}/{}/{}".format(bucket, prefix, filename)
    print("Writing to {}".format(url))
    write_to_s3(filename, bucket, prefix)
[ ]:
%%local
tabular_data = load_boston()
tabular_data_full = pd.DataFrame(tabular_data.data, columns=tabular_data.feature_names)
tabular_data_full["target"] = pd.DataFrame(tabular_data.target)
tabular_data_full.to_csv("boston_house.csv", index=False)

upload_to_s3(bucket, "data/tabular", filename)
[ ]:
%%local
data_s3_path = "s3://{}/{}/{}".format(bucket, prefix, filename)
print("this is path to your s3 files: " + data_s3_path)

Copy the S3 bucket file path

The S3 bucket file path is required to read the data on EMR Spark. Copy and paste the path string shown above into the next cell.

[ ]:
### replace this path string with your path shown in last step
data_s3_path = "s3://sagemaker-us-east-2-060356833389/data/tabular/boston_house/boston_house.csv"

Read the data in EMR spark Cluster

Once we have a path to our data in S3, we can use spark s3 select to read data with the following command. You can specify a data format, schema is not necessary but recommended, and in options you can specify compression, delimiter, header, etc. For more details, please see documentation on using S3 select with Spark.

[ ]:
# EMR cell
schema = " CRIM double, ZN double, INDUS double,\
CHAS double, NOX double, RM double,  AGE double, DIS double,  RAD double,  TAX double, PTRATIO double, \
B double,  LSTAT double, target double"
df = spark.read.format("csv").schema(schema).options(header="true").load(data_s3_path)
[ ]:
df.show(5)

Conclusion

Now that you have read in the data, you can pre-process the data with Spark in an EMR cluster, build an ML pipeline, and train models in scale.

Citation

Boston Housing data, Harrison, D. and Rubinfeld, D.L. `Hedonic prices and the demand for clean air’, J. Environ. Economics & Management, vol.5, 81-102, 1978.