Amazon SageMaker Feature Store: Introduction to Feature Store

This notebook demonstrates how to get started with Feature Store, create feature groups, and ingest data into them. These feature groups are stored in your Feature Store.

Feature groups are resources that contain metadata for all data stored in your Feature Store. A feature group is a logical grouping of features, defined in the feature store to describe records. A feature group’s definition is composed of a list of feature definitions, a record identifier name, and configurations for its online and offline store.

Overview

  1. Set up

  2. Creating a feature group

  3. Ingest data into a feature group

Prerequisites

This notebook uses both boto3 and Python SDK libraries, and the Python 3 (Data Science) kernel. This notebook works with Studio, Jupyter, and JupyterLab.

Library dependencies:

  • sagemaker>=2.100.0

  • numpy

  • pandas

Role requirements:

IMPORTANT: You must attach the following policies to your execution role: * AmazonS3FullAccess * AmazonSageMakerFeatureStoreAccess

policy

Set up

[ ]:
# SageMaker Python SDK version 2.100.0 is required
# boto3 version 1.24.20 is required
import sagemaker
import boto3
import sys

!pip install 'sagemaker>=2.100.0'
!pip install 'boto3>=1.24.20'
[ ]:
import pandas as pd
import numpy as np
import io
from sagemaker.session import Session
from sagemaker import get_execution_role

prefix = "sagemaker-featurestore-introduction"
role = get_execution_role()

sagemaker_session = sagemaker.Session()
region = sagemaker_session.boto_region_name
s3_bucket_name = sagemaker_session.default_bucket()

Inspect your data

In this notebook example we ingest synthetic data. We read from ./data/feature_store_introduction_customer.csv and ./data/feature_store_introduction_orders.csv.

[ ]:
customer_data = pd.read_csv("data/feature_store_introduction_customer.csv")
orders_data = pd.read_csv("data/feature_store_introduction_orders.csv")
[ ]:
customer_data.head()
[ ]:
orders_data.head()

Below is an illustration on the steps the data goes through before it is ingested into a Feature Store. In this notebook, we illustrate the use-case where you have data from multiple sources and want to store them independently in a feature store. Our example considers data from a data warehouse (customer data), and data from a real-time streaming service (order data).

data flow

Create a feature group

We first start by creating feature group names for customer_data and orders_data. Following this, we create two Feature Groups, one for customer_data and another for orders_data

[ ]:
from time import gmtime, strftime, sleep

customers_feature_group_name = "customers-feature-group-" + strftime("%d-%H-%M-%S", gmtime())
orders_feature_group_name = "orders-feature-group-" + strftime("%d-%H-%M-%S", gmtime())

Instantiate a FeatureGroup object for customers_data and orders_data.

[ ]:
from sagemaker.feature_store.feature_group import FeatureGroup

customers_feature_group = FeatureGroup(
    name=customers_feature_group_name, sagemaker_session=sagemaker_session
)
orders_feature_group = FeatureGroup(
    name=orders_feature_group_name, sagemaker_session=sagemaker_session
)
[ ]:
import time

current_time_sec = int(round(time.time()))

record_identifier_feature_name = "customer_id"

Append EventTime feature to your data frame. This parameter is required, and time stamps each data point.

[ ]:
customer_data["EventTime"] = pd.Series([current_time_sec] * len(customer_data), dtype="float64")
orders_data["EventTime"] = pd.Series([current_time_sec] * len(orders_data), dtype="float64")

Load feature definitions to your feature group.

[ ]:
customers_feature_group.load_feature_definitions(data_frame=customer_data)
orders_feature_group.load_feature_definitions(data_frame=orders_data)

Below we call create to create two feature groups, customers_feature_group and orders_feature_group respectively

[ ]:
customers_feature_group.create(
    s3_uri=f"s3://{s3_bucket_name}/{prefix}",
    record_identifier_name=record_identifier_feature_name,
    event_time_feature_name="EventTime",
    role_arn=role,
    enable_online_store=True,
)

orders_feature_group.create(
    s3_uri=f"s3://{s3_bucket_name}/{prefix}",
    record_identifier_name=record_identifier_feature_name,
    event_time_feature_name="EventTime",
    role_arn=role,
    enable_online_store=True,
)

To confirm that your FeatureGroup has been created we use DescribeFeatureGroup and ListFeatureGroups APIs to display the created FeatureGroup.

[ ]:
customers_feature_group.describe()
[ ]:
orders_feature_group.describe()
[ ]:
sagemaker_session.boto_session.client(
    "sagemaker", region_name=region
).list_feature_groups()  # We use the boto client to list FeatureGroups
[ ]:
def check_feature_group_status(feature_group):
    status = feature_group.describe().get("FeatureGroupStatus")
    while status == "Creating":
        print("Waiting for Feature Group to be Created")
        time.sleep(5)
        status = feature_group.describe().get("FeatureGroupStatus")
    print(f"FeatureGroup {feature_group.name} successfully created.")


check_feature_group_status(customers_feature_group)
check_feature_group_status(orders_feature_group)

Add metadata to a feature

We can add searchable metadata fields to FeatureGroup features by using the UpdateFeatureMetadata API. The currently supported metadata fields are description and parameters.

[ ]:
from sagemaker.feature_store.inputs import FeatureParameter

customers_feature_group.update_feature_metadata(
    feature_name="customer_id",
    description="The ID of a customer. It is also used in orders_feature_group.",
    parameter_additions=[FeatureParameter("idType", "primaryKey")],
)

To view feature metadata, we can use DescribeFeatureMetadata to display that feature.

[ ]:
customers_feature_group.describe_feature_metadata(feature_name="customer_id")

Feature metadata fields are searchable. We use search API to find features with metadata that matches some search criteria.

[ ]:
sagemaker_session.boto_session.client("sagemaker", region_name=region).search(
    Resource="FeatureMetadata",
    SearchExpression={
        "Filters": [
            {
                "Name": "FeatureGroupName",
                "Operator": "Contains",
                "Value": "customers-feature-group-",
            },
            {"Name": "Parameters.idType", "Operator": "Equals", "Value": "primaryKey"},
        ]
    },
)  # We use the boto client to search

Ingest data into a feature group

We can put data into the FeatureGroup by using the PutRecord API. It will take < 1 minute to ingest data.

[ ]:
customers_feature_group.ingest(data_frame=customer_data, max_workers=3, wait=True)
[ ]:
orders_feature_group.ingest(data_frame=orders_data, max_workers=3, wait=True)

Using an arbitrary customer record ID, 573291 we use get_record to check that the data has been ingested into the feature group.

[ ]:
customer_id = 573291
sample_record = sagemaker_session.boto_session.client(
    "sagemaker-featurestore-runtime", region_name=region
).get_record(
    FeatureGroupName=customers_feature_group_name, RecordIdentifierValueAsString=str(customer_id)
)
[ ]:
sample_record

We use batch_get_record to check that all data has been ingested into two feature groups by providing customer IDs.

[ ]:
all_records = sagemaker_session.boto_session.client(
    "sagemaker-featurestore-runtime", region_name=region
).batch_get_record(
    Identifiers=[
        {
            "FeatureGroupName": customers_feature_group_name,
            "RecordIdentifiersValueAsString": ["573291", "109382", "828400", "124013"],
        },
        {
            "FeatureGroupName": orders_feature_group_name,
            "RecordIdentifiersValueAsString": ["573291", "109382", "828400", "124013"],
        },
    ]
)
[ ]:
all_records

Add features to a feature group

If we want to update a FeatureGroup that has done the data ingestion, we can use the UpdateFeatureGroup API and then re-ingest data by using the updated dataset.

[ ]:
from sagemaker.feature_store.feature_definition import StringFeatureDefinition

customers_feature_group.update(
    feature_additions=[StringFeatureDefinition("email"), StringFeatureDefinition("name")]
)

Verify the FeatureGroup has been updated successfully or not.

[ ]:
def check_last_update_status(feature_group):
    last_update_status = feature_group.describe().get("LastUpdateStatus")["Status"]
    while last_update_status == "InProgress":
        print("Waiting for FeatureGroup to be updated")
        time.sleep(5)
        last_update_status = feature_group.describe().get("LastUpdateStatus")
    if last_update_status == "Successful":
        print(f"FeatureGroup {feature_group.name} successfully updated.")
    else:
        print(
            f"FeatureGroup {feature_group.name} updated failed. The LastUpdateStatus is"
            + str(last_update_status)
        )


check_last_update_status(customers_feature_group)

Inspect the new dataset.

[ ]:
customer_data_updated = pd.read_csv("data/feature_store_introduction_customer_updated.csv")
[ ]:
customer_data_updated.head()

Append EventTime feature to your data frame again.

[ ]:
customer_data_updated["EventTime"] = pd.Series(
    [current_time_sec] * len(customer_data), dtype="float64"
)

Ingest the new dataset.

[ ]:
customers_feature_group.ingest(data_frame=customer_data_updated, max_workers=3, wait=True)

Use batch_get_record again to check that all updated data has been ingested into customers_feature_group by providing customer IDs.

[ ]:
updated_customers_records = sagemaker_session.boto_session.client(
    "sagemaker-featurestore-runtime", region_name=region
).batch_get_record(
    Identifiers=[
        {
            "FeatureGroupName": customers_feature_group_name,
            "RecordIdentifiersValueAsString": ["573291", "109382", "828400", "124013"],
        }
    ]
)
[ ]:
updated_customers_records

Clean up

Here we remove the Feature Groups we created.

[ ]:
customers_feature_group.delete()
orders_feature_group.delete()

Next steps

In this notebook you learned how to quickly get started with Feature Store and now know how to create feature groups, and ingest data into them.

For an advanced example on how to use Feature Store for a Fraud Detection use-case, see Fraud Detection with Feature Store.

For detailed information about Feature Store, see the Developer Guide.

Programmers note

In this notebook we used a variety of different API calls. Most of them are accessible through the Python SDK, however some only exist within boto3. You can invoke the Python SDK API calls directly on your Feature Store objects, whereas to invoke API calls that exist within boto3, you must first access a boto client through your boto and sagemaker sessions: e.g. sagemaker_session.boto_session.client().

Below we list API calls used in this notebook that exist within the Python SDK and ones that exist in boto3 for your reference.

Python SDK API Calls

  • describe()

  • ingest()

  • delete()

  • create()

  • load_feature_definitions()

  • update()

  • update_feature_metadata()

  • describe_feature_metadata()

Boto3 API Calls

  • list_feature_groups()

  • get_record()

  • batch_get_record()

  • search()