Amazon SageMaker Feature Store: Ground Truth Classification labelling job output to Feature Store

This notebook demonstrates how to securely store the output of an image or text classification labelling job from Amazon Ground Truth directly into Feature Store using a KMS key.

This notebook starts by reading in the output.manifest file, which is the output file from your classification labeling job from Amazon SageMaker Ground Truth. You can substitute your own Amazon S3 bucket and path to a method we provide, which downloads the file to your current working directory. Then we prepare the manifest file for ingestion to an online or offline feature store. We use a Key Management Service (KMS) key for server-side encryption to ensure that your data is securely stored in your feature store.

This notebook uses a KMS key for server side encryption for your Feature Store. For more information on server-side encryption, see Feature Store: Encrypt Data in your Online or Offline Feature Store using KMS key.

To encrypt your data on the client side prior to ingestion, see Amazon SageMaker Feature Store: Client-side Encryption using AWS Encryption SDK for a demonstration.

Overview

  1. Set up.

  2. Prepare output.manifestfor Feature Store.

  3. Create a feature group and ingest your data into it.

Prerequisites

This notebook uses the Python SDK library for Feature Store, and the Python 3 (Data Science) kernel. To encrypt your data with KMS key for server side encryption, you will need to have an active KMS key. If you do not have a KMS key, then you can create one by following the KMS Policy Template steps, or you can visit the KMS section in the console and follow the prompts for creating a KMS key. This notebook is compatible with SageMaker Studio, Jupyter, and JupyterLab.

Library dependencies:

  • sagemaker>=2.0.0

  • numpy

  • pandas

  • boto3

Data

This notebook uses a synthetic manifest file called output.manifest located in the data subfolder.

[ ]:
from time import gmtime, strftime
from sagemaker.feature_store.feature_group import FeatureGroup

import json
import pandas as pd
import sagemaker
import time

Set up

[ ]:
sagemaker_session = sagemaker.Session()
s3_bucket_name = sagemaker_session.default_bucket() #This is the bucket for your offline store.
prefix = 'sagemaker-featurestore-demo'
role = sagemaker.get_execution_role()
region = sagemaker_session.boto_region_name

Additional - Helper Method

Below is a method that you can use to get your manifest file from your S3 bucket into your current working directory.

[ ]:
def download_file_from_s3(bucket, path, filename):
    '''
    Download filename to your current directory.
    Parameters:
        bucket: S3 bucket name
        path: path to file
        filename: the name of the file you are downloading
    Returns:
        None
    '''
    import os.path
    if not os.path.exists(filename):
        s3 = boto3.client('s3')
        s3.download_file(
            Bucket = bucket,
            Key = path,
            Filename = filename
        )
#Supply the path to your output.manifest file from your Ground Truth labelling job.
#download_file_from_s3(public_s3_bucket_name, path='PATH', filename='output.manifest')

Prepare your manifest file for Feature Store.

Below is a method that will parse your output.manifest file into a Panda’s data frame for ingestion into your Feature Store. At this point, it is assumed that your manifest file is in your current working directory.

[ ]:
def create_dataframe_from_manifest(filename):
    '''
    Return a dataframe containing all information from your output.manifest file.
    Parameters:
        filename: path to your output.manifest file. This should be the
        output.manifest file from an AWS Ground Truth Classification labelling job.
    Returns:
        Data frame.

    Implementation details:
    i1 and i2: These are indices within dictionary d that we are looping through.
    k and j: k is a key of dictionary d which is also a dictionary, and j is a key of dictionary k.
    '''
    item_name, classification, class_name_meta_data, \
    confidence_meta_data, type_meta_data, \
    job_name_meta_data, human_annotated_meta_data, creation_date = ([] for _ in range(8))

    for entry in open(filename, 'r'):
        d = json.loads(entry)
        for i1, k in enumerate(d.keys()):
            if i1 == 0:
                item_name.append(d[k])
            elif i1 == 1:
                classification.append(d[k])
            elif i1 == 2:
                for i2, j in enumerate(d[k].keys()):
                    if i2 == 0:
                        class_name_meta_data.append(d[k][j])
                    elif i2 == 1:
                        confidence_meta_data.append(d[k][j])
                    elif i2 == 2:
                        type_meta_data.append(d[k][j])
                    elif i2 == 3:
                        job_name_meta_data.append(d[k][j])
                    elif i2 == 4:
                        human_annotated_meta_data.append(d[k][j])
                    elif i2 == 5:
                        creation_date.append(d[k][j])
    return pd.DataFrame({"item_name" : item_name, "classification": classification,\
                         "class_name_meta_data": class_name_meta_data,\
                        "confidence_meta_data": confidence_meta_data,\
                         "type_meta_data": type_meta_data,
                         "job_name_meta_data": job_name_meta_data,\
                         "human_annotated_meta_data": human_annotated_meta_data,\
                        "creation_date": creation_date})

# output.manifest is located in data/
df = create_dataframe_from_manifest('data/output.manifest')

Preview the parsed manifest file as a data frame

[ ]:
df
[ ]:
df.dtypes
[ ]:
def cast_object_to_string(data_frame):
    """
    Cast all columns of data_frame of type object to type string and return it.
    Parameters:
        data_frame: A pandas Dataframe
    Returns:
        Data frame
    """
    for label in data_frame.columns:
        if data_frame.dtypes[label] == object:
            data_frame[label] = data_frame[label].astype("str").astype("string")
    return data_frame
[ ]:
# Cast columns of df of type object to string.
df = cast_object_to_string(df)
[ ]:
df

Create Feature Group and Ingest data into it

Below we start by appending the EventTime feature to your data to timestamp entries, then we load the feature definition, and instantiate the Feature Group object. Then lastly we ingest the data into your feature store.

[ ]:
feature_group_name = 'ground-truth-classification-feature-group-' + \
                        strftime('%d-%H-%M-%S', gmtime())

Instantiate a FeatureGroup object for your data.

[ ]:
feature_group = FeatureGroup(name=feature_group_name, sagemaker_session=sagemaker_session)
[ ]:
record_identifier_feature_name = "item_name"

Append the EventTime feature to your data frame. This parameter is required, and time stamps each data point.

[ ]:
current_time_sec = int(round(time.time()))

event_time_feature_name = "EventTime"
# append EventTime feature
df[event_time_feature_name] = pd.Series([current_time_sec]*len(df), dtype="float64")
[ ]:
df

Load Feature Definition’s of your data into your feature group.

[ ]:
feature_group.load_feature_definitions(data_frame=df)

Create your feature group.

Important: You will need to substitute your KMS Key ARN for kms_key for server side encryption.

[ ]:
feature_group.create(
    s3_uri=f"s3://{s3_bucket_name}/{prefix}",
    record_identifier_name=record_identifier_feature_name,
    event_time_feature_name="EventTime",
    role_arn=role,
    enable_online_store=False,
    offline_store_kms_key_id = kms_key
)
[ ]:
feature_group.describe()

Continually check your offline store until your data is available in it.

[ ]:
def check_feature_group_status(feature_group):
    """
    Print when the feature group has been successfully created
    Parameters:
        feature_group: FeatureGroup
    Returns:
        None
    """
    status = feature_group.describe().get("FeatureGroupStatus")
    while status == "Creating":
        print("Waiting for Feature Group to be Created")
        time.sleep(5)
        status = feature_group.describe().get("FeatureGroupStatus")
    print(f"FeatureGroup {feature_group.name} successfully created.")


check_feature_group_status(feature_group)

Ingest your data into your feature group.

[ ]:
feature_group.ingest(data_frame=df, max_workers=5, wait=True)
[ ]:
time.sleep(30)
[ ]:
s3_client = sagemaker_session.boto_session.client('s3', region_name=region)

feature_group_s3_uri = feature_group.describe().get("OfflineStoreConfig")\
.get("S3StorageConfig").get("ResolvedOutputS3Uri")

feature_group_s3_prefix = feature_group_s3_uri.replace(f"s3://{s3_bucket_name}/", "")
offline_store_contents = None
while offline_store_contents is None:
    objects_in_bucket = s3_client.list_objects(Bucket=s3_bucket_name,\
                                               Prefix=feature_group_s3_prefix)
    if ('Contents' in objects_in_bucket and len(objects_in_bucket['Contents']) > 1):
        offline_store_contents = objects_in_bucket['Contents']
    else:
        print('Waiting for data in offline store...\n')
        time.sleep(60)

print('Data available.')

Clean up resources

Remove the Feature Group that was created.

[ ]:
feature_group.delete()

Next steps

In this notebook we covered how to securely store the output of an image or text classification labelling job from Amazon Ground Truth directly into Feature Store using KMS key.

To learn more about how server-side encryption is done with Feature Store, see Feature Store: Encrypt Data in your Online or Offline Feature Store using KMS key.

To learn more about how to do client-side encryption to encrypt your image dataset prior to storing it in your feature store, see Amazon SageMaker Feature Store: Client-side Encryption using AWS Encryption SDK. For more information on the AWS Encryption library, see AWS Encryption SDK library.

For detailed information about Feature Store, see the Developer Guide.

For a complete list of Feature Store notebooks, see Feature Store notebook examples.