Parameterize spark configuration in pipeline PySparkProcessor execution


This notebook’s CI test result for us-west-2 is as follows. CI test results in other regions can be found at the end of the notebook.

This us-west-2 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable


Overview

In this example, we demonstrate how we can parameterize spark-configuration in different pipeline PySparkProcessor executions. This example is an extended version of Specifying additional Spark configuration example in Distributed Data Processing using Apache Spark and SageMaker Processing. Here we are creating a simple pipeline with one processing step to demonstrate spark-configuration parameterization capabilities in sagemaker pipeline PySparkProcessor. This could be useful to pipeline users who want to define different spark-configuraitons for different pipeline PySparkProcessor executions.

Prerequisites

To learn about how we can create pipeline, follow this tutorial

Pipeline Creation

The following is the step-by-step process to demonstrate parameterization capabilities in pipeline PySparkProcessor

[ ]:
!pip install -U "sagemaker>2.0"

Restart your notebook kernel after upgrading the SDK

[2]:
import boto3
import sagemaker

from sagemaker.workflow.pipeline_context import PipelineSession

sagemaker_session = PipelineSession()
role = sagemaker.get_execution_role()
default_bucket = sagemaker_session.default_bucket()
region = sagemaker.Session().boto_region_name

Create prefix for parametrize-spark-config-pysparkprocessor-demo

[3]:
from time import gmtime, strftime

# Upload the raw input dataset to a unique S3 location
timestamp_prefix = strftime("%Y-%m-%d-%H-%M-%S", gmtime())
prefix = "sagemaker/parametrize-spark-config-pysparkprocessor-demo/{}".format(timestamp_prefix)

In this example, we process Abalone Data Set using PySpark script. We download the data locally and upload it to our Amazon S3 bucket for data processing.

[ ]:
!mkdir -p data
local_path = "./data/abalone-dataset.csv"

s3 = boto3.resource("s3")
s3.Bucket(f"sagemaker-example-files-prod-{region}").download_file(
    "datasets/tabular/uci_abalone/abalone.csv", local_path
)

input_prefix_abalone = "{}/input/raw/abalone".format(prefix)
input_preprocessed_prefix_abalone = "{}/input/preprocessed/abalone".format(prefix)

sagemaker_session.upload_data(
    path=local_path, bucket=default_bucket, key_prefix=input_prefix_abalone
)

Upload default spark-configuration to Amazon S3

[5]:
import json


def upload_to_s3(bucket, prefix, body):
    s3_object = s3.Object(bucket, prefix)
    s3_object.put(Body=body)


default_spark_configuration = [
    {
        "Classification": "spark-defaults",
        "Properties": {"spark.executor.memory": "2g", "spark.executor.cores": "1"},
    }
]
default_spark_conf_prefix = "{}/spark/conf/cores_1/configuration.json".format(prefix)
default_spark_configuration_object_s3_uri = "s3://{}/{}".format(
    default_bucket, default_spark_conf_prefix
)

upload_to_s3(default_bucket, default_spark_conf_prefix, json.dumps(default_spark_configuration))

If no SparkConfigS3Uri is provided to the pipeline execution, the pipeline uses the pre-uploaded default_spark_configuration as a default spark-config.

[6]:
from sagemaker.workflow.parameters import ParameterString

spark_config_s3_uri = ParameterString(
    name="SparkConfigS3Uri",
    default_value=default_spark_configuration_object_s3_uri,
)

We create a PySpark script similar to this example. The source for a preprocessing script is in the cell below. This script does some basic feature engineering on a raw input dataset. In this example, the dataset is the Abalone Data Set and the code below performs string indexing, one hot encoding, vector assembly, and combines them into a pipeline to perform these transformations in order. The script then does an 80-20 split to produce training and validation datasets as output.

[7]:
!mkdir -p code
[ ]:
%%writefile ./code/preprocess.py
from __future__ import print_function
from __future__ import unicode_literals

import argparse
import csv
import os
import shutil
import sys
import time

import pyspark
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import (
    OneHotEncoder,
    StringIndexer,
    VectorAssembler,
    VectorIndexer,
)
from pyspark.sql.functions import *
from pyspark.sql.types import (
    DoubleType,
    StringType,
    StructField,
    StructType,
)


def extract(row):
    return (row[0],) + tuple(row[1].toArray().tolist())


def main():
    parser = argparse.ArgumentParser(description="app inputs and outputs")
    parser.add_argument("--s3_input_bucket", type=str, help="s3 input bucket")
    parser.add_argument("--s3_input_key_prefix", type=str, help="s3 input key prefix")
    parser.add_argument("--s3_output_bucket", type=str, help="s3 output bucket")
    parser.add_argument("--s3_output_key_prefix", type=str, help="s3 output key prefix")
    args = parser.parse_args()

    spark = SparkSession.builder.appName("PySparkApp").getOrCreate()

    # This is needed to save RDDs which is the only way to write nested Dataframes into CSV format
    spark.sparkContext._jsc.hadoopConfiguration().set(
        "mapred.output.committer.class", "org.apache.hadoop.mapred.FileOutputCommitter"
    )

    # Defining the schema corresponding to the input data. The input data does not contain the headers
    schema = StructType(
        [
            StructField("sex", StringType(), True),
            StructField("length", DoubleType(), True),
            StructField("diameter", DoubleType(), True),
            StructField("height", DoubleType(), True),
            StructField("whole_weight", DoubleType(), True),
            StructField("shucked_weight", DoubleType(), True),
            StructField("viscera_weight", DoubleType(), True),
            StructField("shell_weight", DoubleType(), True),
            StructField("rings", DoubleType(), True),
        ]
    )

    # Downloading the data from S3 into a Dataframe
    total_df = spark.read.csv(
        (
            "s3://"
            + os.path.join(args.s3_input_bucket, args.s3_input_key_prefix, "abalone-dataset.csv")
        ),
        header=False,
        schema=schema,
    )

    # StringIndexer on the sex column which has categorical value
    sex_indexer = StringIndexer(inputCol="sex", outputCol="indexed_sex")

    # one-hot-encoding is being performed on the string-indexed sex column (indexed_sex)
    sex_encoder = OneHotEncoder(inputCol="indexed_sex", outputCol="sex_vec")

    # vector-assembler will bring all the features to a 1D vector for us to save easily into CSV format
    assembler = VectorAssembler(
        inputCols=[
            "sex_vec",
            "length",
            "diameter",
            "height",
            "whole_weight",
            "shucked_weight",
            "viscera_weight",
            "shell_weight",
        ],
        outputCol="features",
    )

    # The pipeline is comprised of the steps added above
    pipeline = Pipeline(stages=[sex_indexer, sex_encoder, assembler])

    # This step trains the feature transformers
    model = pipeline.fit(total_df)

    # This step transforms the dataset with information obtained from the previous fit
    transformed_total_df = model.transform(total_df)

    # Split the overall dataset into 80-20 training and validation
    (train_df, validation_df) = transformed_total_df.randomSplit([0.8, 0.2])

    # Convert the train dataframe to RDD to save in CSV format and upload to S3
    train_rdd = train_df.rdd.map(lambda x: (x.rings, x.features))

    train_rdd.map(extract).toDF().write.mode("overwrite").option("header", False).csv(
        "s3://" + os.path.join(args.s3_output_bucket, args.s3_output_key_prefix, "train")
    )

    # Convert the validation dataframe to RDD to save in CSV format and upload to S3
    validation_rdd = validation_df.rdd.map(lambda x: (x.rings, x.features))

    validation_rdd.map(extract).toDF().write.mode("overwrite").option("header", False).csv(
        "s3://" + os.path.join(args.s3_output_bucket, args.s3_output_key_prefix, "validation")
    )


if __name__ == "__main__":
    main()

Create an instance of an PySparkProcessor to pass in to the processing step.

[ ]:
from sagemaker.spark.processing import PySparkProcessor
from sagemaker.processing import ProcessingInput
from sagemaker.spark.processing import _SparkProcessorBase

pyspark_processor = PySparkProcessor(
    base_job_name="sm-spark",
    framework_version="3.1",
    role=role,
    instance_count=2,
    instance_type="ml.m5.xlarge",
    max_runtime_in_seconds=1200,
    sagemaker_session=sagemaker_session,
)

step_args = pyspark_processor.run(
    "./code/preprocess.py",
    inputs=[
        ProcessingInput(
            source=spark_config_s3_uri,
            destination=f"{pyspark_processor._conf_container_base_path}{pyspark_processor._conf_container_input_name}",
            input_name=_SparkProcessorBase._conf_container_input_name,
        )
    ],
    arguments=[
        "--s3_input_bucket",
        default_bucket,
        "--s3_input_key_prefix",
        input_prefix_abalone,
        "--s3_output_bucket",
        default_bucket,
        "--s3_output_key_prefix",
        input_preprocessed_prefix_abalone,
    ],
)

Create a processing step. This step takes in the PySparkProcessor, the input and output channels, and the ./code/preprocess.py script that we created.

[ ]:
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.workflow.pipeline import Pipeline

spark_step_process = ProcessingStep(name="AbaloneSparkProcess", step_args=step_args)

pipeline_name = f"AbalonePipeline-Spark"
pipeline = Pipeline(
    name=pipeline_name, parameters=[spark_config_s3_uri], steps=[spark_step_process]
)

pipeline.upsert(role_arn=role)

Now, we have successfully created a sagemaker pipeline with a PySparkProcessor.

Pipeline Executions

Execute pipeline with default spark-configuration

If no SparkConfigS3Uri parameter value is provided, pipeline execution uses default_spark_configuration_object_s3_uri as a default spark-configuration. In the following execution example, we execute PySparkProcessor with default spark-configuration.

[11]:
# Execute pipeline with default pre-uploaded spark-config
execution_with_default_spark_configuration = pipeline.start()

# Describe the pipeline execution.
execution_with_default_spark_configuration.describe()
[ ]:
# Wait for the execution to complete.
execution_with_default_spark_configuration.wait()

# List the steps in the execution.
execution_with_default_spark_configuration.list_steps()

We can verify that PySparkProcessor is using the default spark-configuration by looking into the CloudWatch logs.

default configuration

Execute pipeline with a new spark-configuraiton

We upload a new spark-configuration to Amazon S3 and use it in the next pipeline execution

[12]:
spark_configuraitons_cores_2 = [
    {
        "Classification": "spark-defaults",
        "Properties": {"spark.executor.memory": "2g", "spark.executor.cores": "2"},
    }
]

spark_conf_prefix = "{}/spark/conf/cores_2/configuration.json".format(prefix)
spark_configuration_object_s3_uri = "s3://{}/{}".format(default_bucket, spark_conf_prefix)
upload_to_s3(default_bucket, spark_conf_prefix, json.dumps(spark_configuraitons_cores_2))
[13]:
# Execute pipeline with newly uploaded spark-config
execution_spark_conf_spark_executor_cores_2 = pipeline.start(
    parameters=dict(
        SparkConfigS3Uri=spark_configuration_object_s3_uri,
    )
)

# Describe the pipeline execution.
execution_spark_conf_spark_executor_cores_2.describe()
[ ]:
# Wait for the execution to complete.
execution_spark_conf_spark_executor_cores_2.wait()

# List the steps in the execution.
execution_spark_conf_spark_executor_cores_2.list_steps()

We can verify that PySparkProcessor is using the newly provided spark-configuration by looking into the CloudWatch logs.

default configuration

Notebook CI Test Results

This notebook was tested in multiple regions. The test results are as follows, except for us-west-2 which is shown at the top of the notebook.

This us-east-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable

This us-east-2 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable

This us-west-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable

This ca-central-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable

This sa-east-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable

This eu-west-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable

This eu-west-2 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable

This eu-west-3 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable

This eu-central-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable

This eu-north-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable

This ap-southeast-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable

This ap-southeast-2 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable

This ap-northeast-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable

This ap-northeast-2 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable

This ap-south-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable