SageMaker Data Wrangler Job Notebook

This notebook uses the Data Wrangler .flow file to submit a SageMaker Data Wrangler Job with the following steps:

  • Push Data Wrangler .flow file to S3

  • Parse the .flow file inputs, and create the argument dictionary to submit to a boto client

  • Submit the ProcessingJob arguments and wait for Job completion

Optionally, the notebook also gives an example of starting a SageMaker XGBoost TrainingJob using the newly processed data.

[ ]:
%pip install sagemaker --upgrade
[ ]:
import json
import os
import time
import uuid

import boto3
import sagemaker

Parameters

The following lists configurable parameters that are used throughout this notebook.

[ ]:
# S3 bucket for saving processing job outputs
# Feel free to specify a different bucket here if you wish.
sess = sagemaker.Session()
region = sess.boto_region_name
bucket = sess.default_bucket()
prefix = "data_wrangler_flows"
flow_id = f"{time.strftime('%d-%H-%M-%S', time.gmtime())}-{str(uuid.uuid4())[:8]}"
flow_name = f"flow-{flow_id}"
flow_uri = f"s3://{bucket}/{prefix}/{flow_name}.flow"

flow_file_name = "prm.flow"

iam_role = sagemaker.get_execution_role()

container_uri = sagemaker.image_uris.retrieve(framework="data-wrangler", region=region)

# Processing Job Resources Configurations
# Data wrangler processing job only supports 1 instance.
instance_count = 1
instance_type = "ml.m5.4xlarge"

# Processing Job Path URI Information
dw_output_prefix = f"export-{flow_name}/output"
%store dw_output_prefix
output_path = f"s3://{bucket}/{dw_output_prefix}"
output_name = "ff586e7b-a02d-472b-91d4-da3dd05d7a30.default"

processing_job_name = f"data-wrangler-flow-processing-{flow_id}"

processing_dir = "/opt/ml/processing"

# Modify the variable below to specify the content type to be used for writing each output
# Currently supported options are 'CSV' or 'PARQUET', and default to 'CSV'
output_content_type = "CSV"

# URL to use for sagemaker client.
# If this is None, boto will automatically construct the appropriate URL to use
# when communicating with sagemaker.
sagemaker_endpoint_url = None

Upload Data

Upload the dataset that we will be transforming and training on to Amazon S3.

[ ]:
! mkdir data
[ ]:
s3_client = boto3.client("s3", region_name=region)
# download dataset from public S3 bucket
s3_client.download_file(
    f"sagemaker-sample-files",
    "datasets/tabular/fleet-predictive-maintenance/example_fleet_info.csv",
    "data/example_fleet_info.csv",
)
s3_client.download_file(
    f"sagemaker-sample-files",
    "datasets/tabular/fleet-predictive-maintenance/example_fleet_sensor_logs.csv",
    "data/example_fleet_sensor_logs.csv",
)

# upload data to your own S3 bucket
fleet_info_filename = "example_fleet_info.csv"
sensor_logs_filename = "example_fleet_sensor_logs.csv"

s3_client.upload_file(
    Filename=f"data/{fleet_info_filename}",
    Bucket=bucket,
    Key=f"{prefix}/data/{fleet_info_filename}",
)
s3_client.upload_file(
    Filename=f"data/{sensor_logs_filename}",
    Bucket=bucket,
    Key=f"{prefix}/data/{sensor_logs_filename}",
)

fleet_info_uri = f"s3://{bucket}/{prefix}/data/{fleet_info_filename}"
sensor_logs_uri = f"s3://{bucket}/{prefix}/data/{sensor_logs_filename}"

Push Flow to S3

Use the following cell to upload the Data Wrangler .flow file to Amazon S3 so that it can be used as an input to the processing job.

[ ]:
# Load .flow file
with open(flow_file_name) as f:
    flow = json.load(f)

    # replace old s3 locations with our personal s3 location
    new_nodes = []
    for node in flow["nodes"]:
        if node["type"] == "SOURCE":
            if node["parameters"]["dataset_definition"]["name"] == fleet_info_filename:
                node["parameters"]["dataset_definition"]["s3ExecutionContext"][
                    "s3Uri"
                ] = fleet_info_uri
            elif node["parameters"]["dataset_definition"]["name"] == sensor_logs_filename:
                node["parameters"]["dataset_definition"]["s3ExecutionContext"][
                    "s3Uri"
                ] = sensor_logs_uri
        new_nodes.append(node)

    flow["nodes"] = new_nodes

    with open(flow_file_name, "w") as f:
        json.dump(flow, f)

# Upload to S3
s3_client = boto3.client("s3")
s3_client.upload_file(flow_file_name, bucket, f"{prefix}/{flow_name}.flow")

print(f"Data Wrangler Flow notebook uploaded to {flow_uri}")

Create boto3 Processing Job arguments

This notebook submits a Processing Job using boto, which will require an argument dictionary to submit to the boto client. Below, utility methods are defined for creating Processing Job Inputs for the following sources: S3, Athena, and Redshift. Then the argument dictionary is generated using the parsed inputs and job configurations such as instance type.

[ ]:
def create_flow_notebook_processing_input(base_dir, flow_s3_uri):
    return {
        "InputName": "flow",
        "S3Input": {
            "LocalPath": f"{base_dir}/flow",
            "S3Uri": flow_s3_uri,
            "S3DataType": "S3Prefix",
            "S3InputMode": "File",
        },
    }


def create_s3_processing_input(base_dir, name, dataset_definition):
    return {
        "InputName": name,
        "S3Input": {
            "LocalPath": f"{base_dir}/{name}",
            "S3Uri": dataset_definition["s3ExecutionContext"]["s3Uri"],
            "S3DataType": "S3Prefix",
            "S3InputMode": "File",
        },
    }


def create_redshift_processing_input(base_dir, name, dataset_definition):
    return {
        "InputName": name,
        "DatasetDefinition": {
            "RedshiftDatasetDefinition": {
                "ClusterId": dataset_definition["clusterIdentifier"],
                "Database": dataset_definition["database"],
                "DbUser": dataset_definition["dbUser"],
                "QueryString": dataset_definition["queryString"],
                "ClusterRoleArn": dataset_definition["unloadIamRole"],
                "OutputS3Uri": f'{dataset_definition["s3OutputLocation"]}{name}/',
                "OutputFormat": dataset_definition["outputFormat"].upper(),
            },
            "LocalPath": f"{base_dir}/{name}",
        },
    }


def create_athena_processing_input(base_dir, name, dataset_definition):
    return {
        "InputName": name,
        "DatasetDefinition": {
            "AthenaDatasetDefinition": {
                "Catalog": dataset_definition["catalogName"],
                "Database": dataset_definition["databaseName"],
                "QueryString": dataset_definition["queryString"],
                "OutputS3Uri": f'{dataset_definition["s3OutputLocation"]}{name}/',
                "OutputFormat": dataset_definition["outputFormat"].upper(),
            },
            "LocalPath": f"{base_dir}/{name}",
        },
    }


def create_processing_inputs(processing_dir, flow, flow_uri):
    """Helper function for creating processing inputs
    :param flow: loaded data wrangler flow notebook
    :param flow_uri: S3 URI of the data wrangler flow notebook
    """
    processing_inputs = []
    flow_processing_input = create_flow_notebook_processing_input(processing_dir, flow_uri)
    processing_inputs.append(flow_processing_input)

    for node in flow["nodes"]:
        if "dataset_definition" in node["parameters"]:
            data_def = node["parameters"]["dataset_definition"]
            name = data_def["name"]
            source_type = data_def["datasetSourceType"]

            if source_type == "S3":
                s3_processing_input = create_s3_processing_input(processing_dir, name, data_def)
                processing_inputs.append(s3_processing_input)
            elif source_type == "Athena":
                athena_processing_input = create_athena_processing_input(
                    processing_dir, name, data_def
                )
                processing_inputs.append(athena_processing_input)
            elif source_type == "Redshift":
                redshift_processing_input = create_redshift_processing_input(
                    processing_dir, name, data_def
                )
                processing_inputs.append(redshift_processing_input)
            else:
                raise ValueError(f"{source_type} is not supported for Data Wrangler Processing.")
    return processing_inputs


def create_container_arguments(output_name, output_content_type):
    output_config = {output_name: {"content_type": output_content_type}}
    return [f"--output-config '{json.dumps(output_config)}'"]


# Create Processing Job Arguments
processing_job_arguments = {
    "AppSpecification": {
        "ContainerArguments": create_container_arguments(output_name, output_content_type),
        "ImageUri": container_uri,
    },
    "ProcessingInputs": create_processing_inputs(processing_dir, flow, flow_uri),
    "ProcessingOutputConfig": {
        "Outputs": [
            {
                "OutputName": output_name,
                "S3Output": {
                    "S3Uri": output_path,
                    "LocalPath": os.path.join(processing_dir, "output"),
                    "S3UploadMode": "EndOfJob",
                },
            },
        ],
    },
    "ProcessingJobName": processing_job_name,
    "ProcessingResources": {
        "ClusterConfig": {
            "InstanceCount": instance_count,
            "InstanceType": instance_type,
            "VolumeSizeInGB": 30,
        }
    },
    "RoleArn": iam_role,
    "StoppingCondition": {
        "MaxRuntimeInSeconds": 86400,
    },
}

Start ProcessingJob

Now, the Processing Job is submitted to a boto client. The status of the processing job is monitored with the boto client, and this notebook waits until the job is no longer ‘InProgress’.

[ ]:
sagemaker_client = boto3.client("sagemaker", endpoint_url=sagemaker_endpoint_url)
create_response = sagemaker_client.create_processing_job(**processing_job_arguments)

status = sagemaker_client.describe_processing_job(ProcessingJobName=processing_job_name)

while status["ProcessingJobStatus"] == "InProgress":
    status = sagemaker_client.describe_processing_job(ProcessingJobName=processing_job_name)
    print(status["ProcessingJobStatus"])
    time.sleep(60)

print(status)

Kick off SageMaker Training Job (Optional)

Data Wrangler is a SageMaker tool for processing data to be used for Machine Learning. Now that the data has been processed, users will want to train a model using the data. The following shows an example of doing so using a popular algorithm XGBoost.

It is important to note that the following XGBoost objective [‘binary’, ‘regression’, ‘multiclass’], hyperparameters, or content_type may not be suitable for the output data, and will require changes to train a proper model. Furthermore, for CSV training, the algorithm assumes that the target variable is in the first column. For more information on SageMaker XGBoost, please see https://docs.aws.amazon.com/sagemaker/latest/dg/xgboost.html.

Find Training Data path

The below demonstrates how to recursively search the output directory to find the data location.

[ ]:
s3_client = boto3.client("s3")
list_response = s3_client.list_objects_v2(Bucket=bucket, Prefix=dw_output_prefix)
print(list_response)
training_path = None

for content in list_response["Contents"]:
    if "_SUCCESS" not in content["Key"]:
        training_path = content["Key"]

print(training_path)

Next, the Training Job hyperparameters are set. For more information on XGBoost Hyperparameters, see https://xgboost.readthedocs.io/en/latest/parameter.html.

[ ]:
region = boto3.Session().region_name
container = sagemaker.image_uris.retrieve("xgboost", region, "1.2-1")
hyperparameters = {
    "max_depth": "5",
    "objective": "reg:squarederror",
    "num_round": "10",
}
train_content_type = (
    "application/x-parquet" if output_content_type.upper() == "PARQUET" else "text/csv"
)
train_input = sagemaker.inputs.TrainingInput(
    s3_data=f"s3://{bucket}/{training_path}",
    content_type=train_content_type,
)

The TrainingJob configurations are set using the SageMaker Python SDK Estimator, and which is fit using the training data from the ProcessingJob that was run earlier.

[ ]:
estimator = sagemaker.estimator.Estimator(
    container,
    iam_role,
    hyperparameters=hyperparameters,
    instance_count=1,
    instance_type="ml.m5.2xlarge",
)
estimator.fit({"train": train_input})