Get started with SageMaker Processing

This notebook corresponds to the section “Preprocessing Data With The Built-In Scikit-Learn Container” in the blog post Amazon SageMaker Processing – Fully Managed Data Processing and Model Evaluation. It shows a lightweight example of using SageMaker Processing to create train, test, and validation datasets. SageMaker Processing is used to create these datasets, which then are written back to S3.

Runtime

This notebook takes approximately 5 minutes to run.

Contents

  1. Prepare resources

  2. Download data

  3. Prepare Processing script

  4. Run Processing job

  5. Conclusion

Prepare resources

First, let’s create an SKLearnProcessor object, passing the scikit-learn version we want to use, as well as our managed infrastructure requirements.

[2]:
import boto3
import sagemaker
from sagemaker import get_execution_role
from sagemaker.sklearn.processing import SKLearnProcessor

region = sagemaker.Session().boto_region_name
role = get_execution_role()
sklearn_processor = SKLearnProcessor(
    framework_version="1.0-1", role=role, instance_type="ml.m5.xlarge", instance_count=1
)

Download data

Read in the raw data from a public S3 bucket. This example uses the Census-Income (KDD) Dataset from the UCI Machine Learning Repository.

Dua, D. and Graff, C. (2019). UCI Machine Learning Repository [http://archive.ics.uci.edu/ml]. Irvine, CA: University of California, School of Information and Computer Science.

[3]:
import pandas as pd

s3 = boto3.client("s3")
s3.download_file(
    "sagemaker-sample-data-{}".format(region),
    "processing/census/census-income.csv",
    "census-income.csv",
)
df = pd.read_csv("census-income.csv")
df.to_csv("dataset.csv")
df.head()
[3]:
age class of worker detailed industry recode detailed occupation recode education wage per hour enroll in edu inst last wk marital stat major industry code major occupation code ... country of birth father country of birth mother country of birth self citizenship own business or self employed fill inc questionnaire for veteran's admin veterans benefits weeks worked in year year income
0 73 Not in universe 0 0 High school graduate 0 Not in universe Widowed Not in universe or children Not in universe ... United-States United-States United-States Native- Born in the United States 0 Not in universe 2 0 95 - 50000.
1 58 Self-employed-not incorporated 4 34 Some college but no degree 0 Not in universe Divorced Construction Precision production craft & repair ... United-States United-States United-States Native- Born in the United States 0 Not in universe 2 52 94 - 50000.
2 18 Not in universe 0 0 10th grade 0 High school Never married Not in universe or children Not in universe ... Vietnam Vietnam Vietnam Foreign born- Not a citizen of U S 0 Not in universe 2 0 95 - 50000.
3 9 Not in universe 0 0 Children 0 Not in universe Never married Not in universe or children Not in universe ... United-States United-States United-States Native- Born in the United States 0 Not in universe 0 0 94 - 50000.
4 10 Not in universe 0 0 Children 0 Not in universe Never married Not in universe or children Not in universe ... United-States United-States United-States Native- Born in the United States 0 Not in universe 0 0 94 - 50000.

5 rows × 42 columns

Prepare Processing script

Write the Python script that will be run by SageMaker Processing. This script reads the single data file from S3; splits the rows into train, test, and validation sets; and then writes the three output files to S3.

[4]:
%%writefile preprocessing.py
import pandas as pd
import os
from sklearn.model_selection import train_test_split

input_data_path = os.path.join("/opt/ml/processing/input", "dataset.csv")
df = pd.read_csv(input_data_path)
print("Shape of data is:", df.shape)
train, test = train_test_split(df, test_size=0.2)
train, validation = train_test_split(train, test_size=0.2)

try:
    os.makedirs("/opt/ml/processing/output/train")
    os.makedirs("/opt/ml/processing/output/validation")
    os.makedirs("/opt/ml/processing/output/test")
    print("Successfully created directories")
except Exception as e:
    # if the Processing call already creates these directories (or directory otherwise cannot be created)
    print(e)
    print("Could not make directories")
    pass

try:
    train.to_csv("/opt/ml/processing/output/train/train.csv")
    validation.to_csv("/opt/ml/processing/output/validation/validation.csv")
    test.to_csv("/opt/ml/processing/output/test/test.csv")
    print("Wrote files successfully")
except Exception as e:
    print("Failed to write the files")
    print(e)
    pass

print("Completed running the processing job")
Writing preprocessing.py

Run Processing job

Run the Processing job, specifying the script name, input file, and output files.

[5]:
%%capture output

from sagemaker.processing import ProcessingInput, ProcessingOutput

sklearn_processor.run(
    code="preprocessing.py",
    # arguments = ["arg1", "arg2"], # Arguments can optionally be specified here
    inputs=[ProcessingInput(source="dataset.csv", destination="/opt/ml/processing/input")],
    outputs=[
        ProcessingOutput(source="/opt/ml/processing/output/train"),
        ProcessingOutput(source="/opt/ml/processing/output/validation"),
        ProcessingOutput(source="/opt/ml/processing/output/test"),
    ],
)

Get the Processing job logs and retrieve the job name.

[6]:
print(output)
job_name = str(output).split("\n")[1].split(" ")[-1]

Job Name:  sagemaker-scikit-learn-2022-04-18-00-09-00-899
Inputs:  [{'InputName': 'input-1', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-us-west-2-000000000000/sagemaker-scikit-learn-2022-04-18-00-09-00-899/input/input-1/dataset.csv', 'LocalPath': '/opt/ml/processing/input', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}, {'InputName': 'code', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-us-west-2-000000000000/sagemaker-scikit-learn-2022-04-18-00-09-00-899/input/code/preprocessing.py', 'LocalPath': '/opt/ml/processing/input/code', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}]
Outputs:  [{'OutputName': 'output-1', 'AppManaged': False, 'S3Output': {'S3Uri': 's3://sagemaker-us-west-2-000000000000/sagemaker-scikit-learn-2022-04-18-00-09-00-899/output/output-1', 'LocalPath': '/opt/ml/processing/output/train', 'S3UploadMode': 'EndOfJob'}}, {'OutputName': 'output-2', 'AppManaged': False, 'S3Output': {'S3Uri': 's3://sagemaker-us-west-2-000000000000/sagemaker-scikit-learn-2022-04-18-00-09-00-899/output/output-2', 'LocalPath': '/opt/ml/processing/output/validation', 'S3UploadMode': 'EndOfJob'}}, {'OutputName': 'output-3', 'AppManaged': False, 'S3Output': {'S3Uri': 's3://sagemaker-us-west-2-000000000000/sagemaker-scikit-learn-2022-04-18-00-09-00-899/output/output-3', 'LocalPath': '/opt/ml/processing/output/test', 'S3UploadMode': 'EndOfJob'}}]
...........................
Shape of data is: (199523, 43)
[Errno 17] File exists: '/opt/ml/processing/output/train'
Could not make directories
Wrote files successfully
Completed running the processing job

Confirm that the output dataset files were written to S3.

[7]:
import boto3

s3_client = boto3.client("s3")
default_bucket = sagemaker.Session().default_bucket()
for i in range(1, 4):
    prefix = s3_client.list_objects(
        Bucket=default_bucket, Prefix=job_name + "/output/output-" + str(i) + "/"
    )["Contents"][0]["Key"]
    print("s3://" + default_bucket + "/" + prefix)
s3://sagemaker-us-west-2-000000000000/sagemaker-scikit-learn-2022-04-18-00-09-00-899/output/output-1/train.csv
s3://sagemaker-us-west-2-000000000000/sagemaker-scikit-learn-2022-04-18-00-09-00-899/output/output-2/validation.csv
s3://sagemaker-us-west-2-000000000000/sagemaker-scikit-learn-2022-04-18-00-09-00-899/output/output-3/test.csv

Conclusion

In this notebook, we read a dataset from S3 and processed it into train, test, and validation sets using a SageMaker Processing job. You can extend this example for preprocessing your own datasets in preparation for machine learning or other applications.