Run PySpark locally on SageMaker Studio

This notebook shows you how to run PySpark code locally within a SageMaker Studio notebook. The dependencies are installed in the notebook, so you can run this notebook on any image/kernel, including BYO images. For this example, you can choose the Data Science image and Python 3 kernel.

[ ]:
# import sagemaker SDK
import sagemaker

print(sagemaker.__version__)
[ ]:
# setup - install JDK
# you only need to run this once per KernelApp
%conda install openjdk -y
[ ]:
# install PySpark
%pip install pyspark==3.1.1
[ ]:
# import PySpark and build Spark session
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder.appName("PySparkApp")
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.2.2")
    .config(
        "fs.s3a.aws.credentials.provider",
        "com.amazonaws.auth.ContainerCredentialsProvider",
    )
    .getOrCreate()
)

print(spark.version)
  1. If you see an exception in running the cell above similar to this - Exception: Java gateway process exited before sending the driver its port number, restart your JupyterServer app to make sure you’re on the latest version of Studio.

  2. If you are running this notebook in a SageMaker Studio notebook, run the above cell as-is. If you are running on a SageMaker notebook instance, replace com.amazonaws.auth.ContainerCredentialsProvider with com.amazonaws.auth.InstanceProfileCredentialsProvider.

Create and run user-defined functions

Now that you have installed PySpark and initiated a Spark session, let’s try out a couple of sample Pandas user defined functions (UDF).

[ ]:
from pyspark.sql.types import *
from pyspark.sql.functions import (
    col,
    count,
    rand,
    collect_list,
    explode,
    struct,
    count,
    lit,
)
from pyspark.sql.functions import pandas_udf, PandasUDFType

# generate random data
df = (
    spark.range(0, 10 * 100 * 100)
    .withColumn("id", (col("id") / 100).cast("integer"))
    .withColumn("v", rand())
)
df.cache()
df.count()

df.show()
[ ]:
# sample pandas udf to return squared value
@pandas_udf("double", PandasUDFType.SCALAR)
def pandas_squared(v):
    return v * v


df.withColumn("v2", pandas_squared(df.v))

In this next example, you’ll run Ordinary least squares (OLS) linear regression by group using statsmodels.

[ ]:
df2 = (
    df.withColumn("y", rand())
    .withColumn("x1", rand())
    .withColumn("x2", rand())
    .select("id", "y", "x1", "x2")
)
df2.show()
[ ]:
import pandas as pd
import statsmodels.api as sm

group_column = "id"
y_column = "y"
x_columns = ["x1", "x2"]
schema = df2.select(group_column, *x_columns).schema
[ ]:
# sample UDF with input and output data frames
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def ols(pdf):
    group_key = pdf[group_column].iloc[0]
    y = pdf[y_column]
    X = pdf[x_columns]
    X = sm.add_constant(X)
    model = sm.OLS(y, X).fit()
    return pd.DataFrame(
        [[group_key] + [model.params[i] for i in x_columns]],
        columns=[group_column] + x_columns,
    )
[ ]:
# run ols grouped by the "id" group column
beta = df2.groupby(group_column).apply(ols)
beta.show()

Run Spark processing scripts locally

You can run Spark processing scripts on your notebook like below. You’ll read the sample abalone dataset from an S3 location and perform preprocessing on the dataset. You will - 1. Apply transforms on the data such as one-hot encoding, merge columns to a single vector 2. Create a preprocessing pipeline 3. Fit and transform the dataset 4. Split into a training and validation set 5. Save the files to local storage

[ ]:
from pyspark.sql.types import (
    DoubleType,
    StringType,
    StructField,
    StructType,
)
from pyspark.ml.feature import (
    OneHotEncoder,
    StringIndexer,
    VectorAssembler,
    VectorIndexer,
)
[ ]:
schema = StructType(
    [
        StructField("sex", StringType(), True),
        StructField("length", DoubleType(), True),
        StructField("diameter", DoubleType(), True),
        StructField("height", DoubleType(), True),
        StructField("whole_weight", DoubleType(), True),
        StructField("shucked_weight", DoubleType(), True),
        StructField("viscera_weight", DoubleType(), True),
        StructField("shell_weight", DoubleType(), True),
        StructField("rings", DoubleType(), True),
    ]
)
[ ]:
data_uri = "s3a://sagemaker-sample-files/datasets/tabular/uci_abalone/abalone.csv"

abalone_df = spark.read.csv(data_uri, header=False, schema=schema)
abalone_df.show(2)
[ ]:
# StringIndexer on the sex column which has categorical value
sex_indexer = StringIndexer(inputCol="sex", outputCol="indexed_sex")

# one-hot encoding on the string-indexed sex column (indexed_sex)
sex_encoder = OneHotEncoder(inputCol="indexed_sex", outputCol="sex_vec")

# vector-assembler will bring all the features to a 1D vector to save easily into CSV format
assembler = VectorAssembler(
    inputCols=[
        "sex_vec",
        "length",
        "diameter",
        "height",
        "whole_weight",
        "shucked_weight",
        "viscera_weight",
        "shell_weight",
    ],
    outputCol="features",
)
[ ]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[sex_indexer, sex_encoder, assembler])
model = pipeline.fit(abalone_df)

# apply transforms to the data frame
transformed_df = model.transform(abalone_df)
transformed_df.show(2)
[ ]:
# split into train and test set, and save to file
(train_df, validation_df) = transformed_df.randomSplit([0.8, 0.2])
[ ]:
# write features to csv
from pyspark.ml.functions import vector_to_array

# extract only rings and features
train_df_final = train_df.withColumn("feature", vector_to_array("features")).select(
    ["rings"] + [col("feature")[i] for i in range(9)]
)

val_df_final = validation_df.withColumn("feature", vector_to_array("features")).select(
    ["rings"] + [col("feature")[i] for i in range(9)]
)

# write to csv
train_df_final.write.csv("train")
val_df_final.write.csv("validation")

Print the first five rows of the preprocessed output file.

[ ]:
import os
import pandas as pd

files = os.listdir("./train")
file_name = [f for f in files if f.endswith(".csv")]

print("Top 5 rows from the train file")
pd.read_csv(f"./train/{file_name[0]}", header=None).head(5)

Run the script as a SageMaker processing job

Once experimentation is complete, you can run the script as a SageMaker processing job. SageMaker processing jobs let you perform data pre-processing, post-processing, feature engineering, and data validation on infrastructure fully managed by SageMaker.

./code/preprocess.py script adds the preprocessing we’ve done above locally to a script that can be used to run a standalone processing job. Let’s view the file contents below.

[ ]:
!pygmentize ./code/preprocess.py

We’ll now use the PySparkProcessor class to define a Spark job and run it using SageMaker processing. For detailed reference, see Data Processing with Spark.

[ ]:
import logging
from time import strftime, gmtime
from sagemaker.session import Session
from sagemaker.spark.processing import PySparkProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput

sagemaker_logger = logging.getLogger("sagemaker")
sagemaker_logger.setLevel(logging.INFO)
sagemaker_logger.addHandler(logging.StreamHandler())

sagemaker_session = Session()
bucket = sagemaker_session.default_bucket()
role = sagemaker.get_execution_role()
[ ]:
# fetch the dataset from the SageMaker bucket
import boto3

s3 = boto3.client("s3")
s3.download_file(
    f"sagemaker-sample-files", "datasets/tabular/uci_abalone/abalone.csv", "abalone.csv"
)

# upload the raw input dataset to a unique S3 location
timestamp_prefix = strftime("%Y-%m-%d-%H-%M-%S", gmtime())
prefix = "sagemaker/local-pyspark/{}".format(timestamp_prefix)
input_prefix_abalone = "{}/abalone-preprocess/input".format(prefix)
input_preprocessed_prefix_abalone = "{}/abalone-preprocess/output".format(prefix)

sagemaker_session.upload_data(path="abalone.csv", bucket=bucket, key_prefix=input_prefix_abalone)

# run the processing job
spark_processor = PySparkProcessor(
    base_job_name="local-pyspark",
    framework_version="3.1",
    role=role,
    instance_count=2,
    instance_type="ml.m5.xlarge",
    max_runtime_in_seconds=1200,
    tags=[{"Key": "tag-key", "Value": "tag-value"}],
)

spark_processor.run(
    submit_app="./code/preprocess.py",
    arguments=[
        "--s3_input_bucket",
        bucket,
        "--s3_input_key_prefix",
        input_prefix_abalone,
        "--s3_output_bucket",
        bucket,
        "--s3_output_key_prefix",
        input_preprocessed_prefix_abalone,
    ],
    spark_event_logs_s3_uri="s3://{}/{}/spark_event_logs".format(bucket, prefix),
    logs=False,
)

Inspect the first five rows of the preprocessed output file.

[ ]:
# get output file name from S3 and print the first five records
train_output_key = ""
response = s3.list_objects_v2(Bucket=bucket, Prefix=f"{input_preprocessed_prefix_abalone}/train")

for cont in response["Contents"]:
    if cont["Key"].endswith(".csv"):
        train_output_key = cont["Key"]

if train_output_key == "":
    print("Preprocessing train file not found. Check to make sure the job ran successfully.")
else:
    print("Top 5 rows from s3://{}/{}/train/".format(bucket, input_preprocessed_prefix_abalone))
    s3.download_file(bucket, train_output_key, "train_output.csv")
    print(pd.read_csv("train_output.csv", header=None).head())

Conclusion and Cleanup

In this notebook, we installed PySpark on Studio notebook and created a spark session to run PySpark code locally within Studio. You can use this as a starting point to prototype your Spark code on a smaller sample of your data before running the SageMaker processing job on your entire dataset. You can extend this example to preprocess your data for machine learning.

To avoid incurring costs, remember to shut down the SageMaker Studio app, or stop the notebook instance as necessary.