Amazon SageMaker Feature Store: Feature Processor Introduction


This notebook’s CI test result for us-west-2 is as follows. CI test results in other regions can be found at the end of the notebook.

This us-west-2 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable


This notebook demonstrates how to get started with Feature Processor using SageMaker python SDK, create feature groups, perform batch transformation and ingest processed input data to feature groups.

We first demonstrate how to use @feature-processor decorator to run the job locally and then show how to use @remote decorator to execute large batch transform and ingestion on SageMaker training job remotely. Besides, the SDK provides APIs to create scheduled pipelines based on transformation code.

If you would like to learn more about Feature Processor, see documentation Feature Processing for more info and examples.

Setup For Notebook

Setup Runtime Environment

First we create a new kernel to execute this notebook. 1. Launch a new terminal in the current image (the ’$_’ icon at the top of this notebook). 2. Execute the commands:

conda create --name feature-processing-py-3.9 python=3.9 -y
conda activate feature-processing-py-3.9
conda install ipykernel -y
conda install openjdk -y
  1. Return to this notebook and select the kernel with Image: ‘Data Science’ and Kernel: ‘feature-processing-py-3.9’

Alternatively If you run this notebook on SageMaker Studio, you can execute the following cell to install runtime dependencies.

[ ]:
%%capture

!apt-get update
!apt-get install openjdk-11-jdk -y
%pip install ipykernel

To get the Feature Processor module, we need to reinstall the SageMaker python SDK along with extra dependencies.

[ ]:
%pip install 'sagemaker[feature-processor]' --force-reinstall
[ ]:
"""
Restart the kernel.
"""
from IPython.display import display_html

display_html("<script>Jupyter.notebook.kernel.restart()</script>", raw=True)
[ ]:
"""
Function to save the cell code in a file and execute the cell as well. This will be used later to create Lineage artifact for the code.
"""
from IPython.core.magic import register_cell_magic


@register_cell_magic
def write_and_execute(line, cell):
    argz = line.split()
    file = argz[-1]
    mode = "w"
    if len(argz) == 2 and argz[0] == "-a":
        mode = "a"
    with open(file, mode) as f:
        f.write(cell)
    get_ipython().run_cell(cell)

Create Feature Groups

First we start by creating two feature groups. One feature group is used for storing raw car sales dataset which is located in data/car_data.csv. We create another feature group to store aggregated feature values after feature processing, for example average value of mileage, price and msrp.

[ ]:
"""
Set up feature groups.
"""

import boto3, time
import sagemaker
from sagemaker.s3 import S3Uploader
from sagemaker import get_execution_role
import logging

logger = logging.getLogger()
logger.setLevel(logging.INFO)


sagemaker_session = sagemaker.Session()
sagemaker_client = boto3.client("sagemaker")
sagemaker_featurestore_runtime_client = boto3.client("sagemaker-featurestore-runtime")

aws_account_id = sagemaker_session.account_id()
region = sagemaker_session.boto_region_name

s3_bucket = sagemaker_session.default_bucket()
s3_prefix = "feature-store/feature-processing"

s3_data_prefix = f"{s3_prefix}/data-sets"
s3_offline_store_prefix = f"{s3_prefix}/offline-store"
offline_store_role = get_execution_role()

"""
Feature Group Definitions.
"""
from sagemaker.feature_store.feature_definition import FeatureDefinition, FeatureTypeEnum

# S3 Data Source - Car Sales, and uploads to S3
CAR_SALES_DATA_DIR = "./data/car_data.csv"
RAW_CAR_SALES_S3_URI = S3Uploader.upload(CAR_SALES_DATA_DIR, f"s3://{s3_bucket}/{s3_data_prefix}")

# Feature Group - Car Sales
CAR_SALES_FG_NAME = "car-data"
CAR_SALES_FG_ARN = f"arn:aws:sagemaker:{region}:{aws_account_id}:feature-group/{CAR_SALES_FG_NAME}"
CAR_SALES_FG_ROLE_ARN = offline_store_role
CAR_SALES_FG_OFFLINE_STORE_S3_URI = f"s3://{s3_bucket}/{s3_offline_store_prefix}"
CAR_SALES_FG_FEATURE_DEFINITIONS = [
    FeatureDefinition(feature_name="id", feature_type=FeatureTypeEnum.STRING),
    FeatureDefinition(feature_name="model", feature_type=FeatureTypeEnum.STRING),
    FeatureDefinition(feature_name="model_year", feature_type=FeatureTypeEnum.STRING),
    FeatureDefinition(feature_name="status", feature_type=FeatureTypeEnum.STRING),
    FeatureDefinition(feature_name="mileage", feature_type=FeatureTypeEnum.STRING),
    FeatureDefinition(feature_name="price", feature_type=FeatureTypeEnum.STRING),
    FeatureDefinition(feature_name="msrp", feature_type=FeatureTypeEnum.STRING),
    FeatureDefinition(feature_name="ingest_time", feature_type=FeatureTypeEnum.FRACTIONAL),
]
CAR_SALES_FG_RECORD_IDENTIFIER_NAME = "id"
CAR_SALES_FG_EVENT_TIME_FEATURE_NAME = "ingest_time"

# Feature Group - Aggregated Car Sales
AGG_CAR_SALES_FG_NAME = "car-data-aggregated"
AGG_CAR_SALES_FG_ARN = (
    f"arn:aws:sagemaker:{region}:{aws_account_id}:feature-group/{AGG_CAR_SALES_FG_NAME}"
)
AGG_CAR_SALES_FG_ROLE_ARN = offline_store_role
AGG_CAR_SALES_FG_OFFLINE_STORE_S3_URI = f"s3://{s3_bucket}/{s3_offline_store_prefix}"
AGG_CAR_SALES_FG_FEATURE_DEFINITIONS = [
    FeatureDefinition(feature_name="model_year_status", feature_type=FeatureTypeEnum.STRING),
    FeatureDefinition(feature_name="avg_mileage", feature_type=FeatureTypeEnum.STRING),
    FeatureDefinition(feature_name="max_mileage", feature_type=FeatureTypeEnum.STRING),
    FeatureDefinition(feature_name="avg_price", feature_type=FeatureTypeEnum.STRING),
    FeatureDefinition(feature_name="max_price", feature_type=FeatureTypeEnum.STRING),
    FeatureDefinition(feature_name="avg_msrp", feature_type=FeatureTypeEnum.STRING),
    FeatureDefinition(feature_name="max_msrp", feature_type=FeatureTypeEnum.STRING),
    FeatureDefinition(feature_name="ingest_time", feature_type=FeatureTypeEnum.FRACTIONAL),
]
AGG_CAR_SALES_FG_RECORD_IDENTIFIER_NAME = "model_year_status"
AGG_CAR_SALES_FG_EVENT_TIME_FEATURE_NAME = "ingest_time"


"""
Create the Feature Groups.
"""
from sagemaker.feature_store.feature_group import FeatureGroup

# Create Feature Group -  Car sale records.
car_sales_fg = FeatureGroup(
    name=CAR_SALES_FG_NAME,
    feature_definitions=CAR_SALES_FG_FEATURE_DEFINITIONS,
    sagemaker_session=sagemaker_session,
)

try:
    create_car_sales_fg_resp = car_sales_fg.create(
        record_identifier_name=CAR_SALES_FG_RECORD_IDENTIFIER_NAME,
        event_time_feature_name=CAR_SALES_FG_EVENT_TIME_FEATURE_NAME,
        s3_uri=CAR_SALES_FG_OFFLINE_STORE_S3_URI,
        enable_online_store=True,
        role_arn=CAR_SALES_FG_ROLE_ARN,
    )
    print(f"Created feature group {create_car_sales_fg_resp}")
except Exception as e:
    if "ResourceInUse" in str(e):
        print("Feature Group already exists")
    else:
        raise e

# Create Feature Group -  Aggregated car sales records.
agg_car_sales_fg = FeatureGroup(
    name=AGG_CAR_SALES_FG_NAME,
    feature_definitions=AGG_CAR_SALES_FG_FEATURE_DEFINITIONS,
    sagemaker_session=sagemaker_session,
)

try:
    create_agg_car_sales_fg_resp = agg_car_sales_fg.create(
        record_identifier_name=AGG_CAR_SALES_FG_RECORD_IDENTIFIER_NAME,
        event_time_feature_name=AGG_CAR_SALES_FG_EVENT_TIME_FEATURE_NAME,
        s3_uri=AGG_CAR_SALES_FG_OFFLINE_STORE_S3_URI,
        enable_online_store=True,
        role_arn=AGG_CAR_SALES_FG_ROLE_ARN,
    )
    print(f"Created feature group {create_agg_car_sales_fg_resp}")
    print("Sleeping for a bit, to let Feature Groups get ready.")
    time.sleep(15)
except Exception as e:
    if "ResourceInUse" in str(e):
        print("Feature Group already exists")
    else:
        raise e

@feature_processor

The following example demonstrates how to use the @feature_processor decorator to load data from Amazon S3 to a SageMaker Feature Group.

A @feature_processor decorated function automatically loads data from the configured inputs, applies the feature processing code and ingests the transformed data to a feature group.

[ ]:
%%write_and_execute car-data-ingestion.py

from sagemaker.feature_store.feature_processor import (
    feature_processor,
    FeatureGroupDataSource,
    CSVDataSource,
)


@feature_processor(
    inputs=[CSVDataSource(RAW_CAR_SALES_S3_URI)],
    output=CAR_SALES_FG_ARN,
    target_stores=["OfflineStore"],
)
def transform(raw_s3_data_as_df):
    """Load data from S3, perform basic feature engineering, store it in a Feature Group"""
    from pyspark.sql.functions import regexp_replace
    from pyspark.sql.functions import lit
    import time

    transformed_df = (
        raw_s3_data_as_df.withColumn("Price", regexp_replace("Price", "\$", ""))
        # Rename Columns
        .withColumnRenamed("Id", "id")
        .withColumnRenamed("Model", "model")
        .withColumnRenamed("Year", "model_year")
        .withColumnRenamed("Status", "status")
        .withColumnRenamed("Mileage", "mileage")
        .withColumnRenamed("Price", "price")
        .withColumnRenamed("MSRP", "msrp")
        # Add Event Time
        .withColumn("ingest_time", lit(int(time.time())))
        # Remove punctuation and fluff; replace with NA
        .withColumn("mileage", regexp_replace("mileage", "(,)|(mi\.)", ""))
        .withColumn("mileage", regexp_replace("mileage", "Not available", "NA"))
        .withColumn("price", regexp_replace("price", ",", ""))
        .withColumn("msrp", regexp_replace("msrp", "(^MSRP\s\\$)|(,)", ""))
        .withColumn("msrp", regexp_replace("msrp", "Not specified", "NA"))
        .withColumn("msrp", regexp_replace("msrp", "\\$\d+[a-zA-Z\s]+", "NA"))
        .withColumn("model", regexp_replace("model", "^\d\d\d\d\s", ""))
    )

    transformed_df.show()

    return transformed_df


# Execute the FeatureProcessor and show the results.
transform()

@feature_processor + @remote

The following example demonstrates how to run your feature processing code remotely.

This is useful if you are working with large data sets that require hardware more powerful than locally available. You can decorate your code with the @remote decorator to run your local Python code as a single or multi-node distributed SageMaker training job. For more information on running your code as a SageMaker training job, see Run your local code as a SageMaker training job.

[ ]:
"""
Create a requirements.txt and specify sagemaker as a remote job dependency.
"""
sagemaker_version = sagemaker.__version__
with open("requirements.txt", "w") as file:
    file.write(f"sagemaker=={sagemaker_version}")
[ ]:
from pyspark.sql import DataFrame, SparkSession
from sagemaker.remote_function import remote
from sagemaker.feature_store.feature_processor import (
    feature_processor,
    CSVDataSource,
    FeatureGroupDataSource,
)
from sagemaker.remote_function.spark_config import SparkConfig


@remote(
    spark_config=SparkConfig(),
    instance_type="ml.m5.2xlarge",
    dependencies="./requirements.txt",
    # keep_alive_period_in_seconds=900  # Requires an account limit increase to enable warm pooling.
)
@feature_processor(
    inputs=[FeatureGroupDataSource(CAR_SALES_FG_ARN)],
    output=AGG_CAR_SALES_FG_ARN,
    target_stores=["OfflineStore"],
)
def aggregate(source_feature_group, spark):
    """
    Aggregate the data using a SQL query and UDF.
    """
    import time
    from pyspark.sql.types import StringType
    from pyspark.sql.functions import udf

    @udf(returnType=StringType())
    def custom_concat(*cols, delimeter: str = ""):
        return delimeter.join(cols)

    spark.udf.register("custom_concat", custom_concat)

    # Execute SQL string.
    source_feature_group.createOrReplaceTempView("car_data")
    aggregated_car_data = spark.sql(
        f"""
        SELECT
            custom_concat(model, "_", model_year, "_", status) as model_year_status,
            AVG(price) as avg_price,
            MAX(price) as max_price,
            AVG(mileage) as avg_mileage,
            MAX(mileage) as max_mileage,
            AVG(msrp) as avg_msrp,
            MAX(msrp) as max_msrp,
            "{int(time.time())}" as ingest_time
        FROM car_data
        GROUP BY model_year_status
        """
    )

    aggregated_car_data.show()

    return aggregated_car_data


# Execute the aggregate
aggregate()

to_pipeline and schedule

The following example demonstrates how to operationalize your feature processor by promoting it to a SageMaker Pipeline and configuring a schedule to execute it on a regular basis. This example uses the aggregate function defined above. Note, in order to create a pipeline, please make sure your method is annotated by both @remote and @feature-processor decorators.

[ ]:
"""
Upload the transformation_code.py saved earlier to S3, to track it in SageMaker ML Lineage.
"""
from sagemaker.s3 import S3Uploader, s3_path_join

car_data_s3_uri = s3_path_join(
    "s3://", sagemaker_session.default_bucket(), "transformation_code", "car-data-ingestion.py"
)

S3Uploader.upload(local_path="car-data-ingestion.py", desired_s3_uri=car_data_s3_uri)

print(car_data_s3_uri)
[ ]:
"""
Annotate the transform method with @remote decorator so that we create Feature Processor Pipeline for it.
"""
transform = remote(
    transform,
    spark_config=SparkConfig(),
    instance_type="ml.m5.2xlarge",
    dependencies="./requirements.txt",
)

In the following example, we will create and schedule the pipeline using to_pipeline and schedule method. If you want to test the job before scheduling, you can use execute to start only one execution.

The SDK also provides two extra methods describe and list_pipelines for you to get insights about the pipeline info.

[ ]:
import sagemaker.feature_store.feature_processor as fp
from sagemaker.feature_store.feature_processor import TransformationCode

"""
Create a Feature Processor Pipeline and start one execution.
"""
car_data_pipeline_name = f"{CAR_SALES_FG_NAME}-ingestion-pipeline"
car_data_pipeline_arn = fp.to_pipeline(
    pipeline_name=car_data_pipeline_name,
    step=transform,
    transformation_code=TransformationCode(s3_uri=car_data_s3_uri),
)
print(f"Created SageMaker Pipeline: {car_data_pipeline_arn}.")

car_data_pipeline_execution_arn = fp.execute(pipeline_name=car_data_pipeline_name)
print(f"Started an execution with execution arn: {car_data_pipeline_execution_arn}")

fp.describe(pipeline_name=car_data_pipeline_name)
[ ]:
"""
Create a Feature Processor Pipeline and start one execution.
"""
car_data_aggregated_pipeline_name = f"{AGG_CAR_SALES_FG_NAME}-ingestion-pipeline"
car_data_aggregated_pipeline_arn = fp.to_pipeline(
    pipeline_name=car_data_aggregated_pipeline_name, step=aggregate
)
print(f"Created SageMaker Pipeline: {car_data_aggregated_pipeline_arn}.")

car_data_aggregated_pipeline_execution_arn = fp.execute(
    pipeline_name=car_data_aggregated_pipeline_name
)
print(f"Started an execution with execution arn: {car_data_aggregated_pipeline_execution_arn}")

"""
Schedule the pipeline.
"""
fp.schedule(
    pipeline_name=car_data_aggregated_pipeline_name,
    schedule_expression="rate(24 hours)",
    state="ENABLED",
)
print(f"Created a schedule.")

fp.describe(pipeline_name=car_data_aggregated_pipeline_name)
[ ]:
"""
Feature Processor Pipelines in this account.
"""
fp.list_pipelines()

Explorating feature processing pipelines and ML Lineage.

You can track scheduled SageMaker Pipelines with SageMaker Lineage in Amazon SageMaker Studio. This includes tracking scheduled executions, visualizing lineage to trace features back to their data sources, and viewing shared feature processing code all in one environment.

Find the feature groups that were created in this notebook and view the Pipeline Executions and Lineage tabs.

Clean up Resources

[ ]:
# Disable the scheduled pipeline
fp.schedule(
    pipeline_name=car_data_aggregated_pipeline_name,
    schedule_expression="rate(24 hours)",
    state="DISABLED",
)

print(f"Disabled the schedule.")
[ ]:
# Delete feature groups
car_sales_fg.delete()
agg_car_sales_fg.delete()

print(f"Feature groups are deleted.")

Notebook CI Test Results

This notebook was tested in multiple regions. The test results are as follows, except for us-west-2 which is shown at the top of the notebook.

This us-east-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This us-east-2 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This us-west-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This ca-central-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This sa-east-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This eu-west-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This eu-west-2 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This eu-west-3 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This eu-central-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This eu-north-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This ap-southeast-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This ap-southeast-2 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This ap-northeast-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This ap-northeast-2 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This ap-south-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable