Enterprise-Grade ML : Part 1 - Prepare and Use the ML Gateway Pattern for Inference

Data Preperation and Inference Using SageMaker Feature Store

Contents


Motivation


Data Science projects often start in an experimental phase in which transformations on features are experimented with, algorithms are selected and tried for determining if they can fit the data distribution well enough for reliable predictions, tuning is done with various hyper-parameters and so on.

As an organization matures in their Machine Learning (ML) Journey, they will find that they will then transition to an automated ML or MLOps phase where the pipelines for data preparation, training, deployment, monitoring will all need to be automated.

In order to raise the maturity of projects to an Enterprise Scale that can fulfill business needs, sustain business-level continuity, scale, security and performance, the need for integrating data science experiments with machine learning deployment patterns and best-practices will grow in importance and will save you time and money.

In this blog series on ML Patterns, we will start by focusing on Deployment Patterns and Best-Practices within the ML lifecycle : exploring the considerations and options that present themselves, post-training; on the serving/inference/prediction phases of the ML lifecycle.

There are many ways in which we can expose an endpoint that was deployed as a hosted SageMaker endpoint: these variations are summarized in the ML Gateway Pattern with mandatory and optional components. Through this series of blogs we will outline options and their context, pros and cons for helping you decide what components to use for your specific workload and use-case.

Architecture


Here we break down the example in this blog into four parts:

  1. Data prep

    1. For preparation we will load the CSV into s3

    2. Then create and populate a Feature Store that can be used for training our model

    3. Later we will use Athena to load the data from the feature store into a dataframe

  2. Training and deployment

  3. Inference

  4. MLOps — deployment of a Cloud Formation Template

image

Import Libraries and SageMaker Session Variables


[ ]:
import pip

def import_or_install(package):
    try:
        __import__(package)
    except ImportError:
        pip.main(['install', package])

import_or_install('sagemaker')
import_or_install('boto3')
[ ]:
import pandas as pd
import sagemaker
import boto3
import os
from sagemaker.inputs import TrainingInput
from sagemaker.xgboost import XGBoost
from sagemaker.predictor import Predictor
from sagemaker.serializers import CSVSerializer
from sagemaker.deserializers import CSVDeserializer
from sagemaker.session import production_variant
from sagemaker.model_monitor import DataCaptureConfig, CronExpressionGenerator, DefaultModelMonitor
from sagemaker.model_monitor.dataset_format import DatasetFormat
import datetime as datetime
import statistics
import numpy as np
import requests
import shutil
import time
import helpers

Session variables

[ ]:
role = sagemaker.get_execution_role()

# Session variables
sess = sagemaker.Session()
bucket = sess.default_bucket()
prefix = "ml-gateway"
region = sess.boto_region_name

print(f"Region: {region}\nBucket: {bucket}\nPrefix: {prefix}\n")

# Data source location
claims_url = "https://raw.githubusercontent.com/aws/amazon-sagemaker-examples/main/end_to_end/fraud_detection/data/claims_preprocessed.csv"
customers_url = "https://raw.githubusercontent.com/aws/amazon-sagemaker-examples/main/end_to_end/fraud_detection/data/customers_preprocessed.csv"

# Feature Store parameters
claims_feature_group_name = "claims-feature-group"
customers_feature_group_name = "customers-feature-group"
claims_feature_group_description = "Claims feature group"
customers_feature_group_description = "Customers feature group"
id_name = "policy_id"
event_time_name = "event_time"
claims_offline_feature_group_bucket = f"s3://{bucket}/claims-feature-group"
customers_offline_feature_group_bucket = f"s3://{bucket}/customers-feature-group"

# SageMaker training
s3_input_train_uri = f"s3://{bucket}/{prefix}/data/train/train.csv"
s3_input_test_uri = f"s3://{bucket}/{prefix}/data/test/test.csv"
train_instance_type = "ml.m4.xlarge"
train_base_job_name = "xgboost-model"

# Model names
model1_name = "xgboost-model-1"
model2_name = "xgboost-model-2"

# SageMaker endpoint
endpoint_name = "xgboost-claims-fraud"
deploy_instance_type = "ml.m4.xlarge"

# SageMaker Model Monitor
monitor_schedule_name = f"{prefix}-monitor-schedule"

Data and Features


The data we are using is the same synthetic data that was created in this blog post for the End-to-end ML Lifecycle with Amazon SageMaker. The use-case in the above link/blog is Autoclaim Fraud Detection. We will be using the same datasets to demonstrate the ML Gateway Pattern in this example.

Get data

[ ]:
# Get claims and customer data from existing aws-samples location
claims_df = pd.read_csv(claims_url)
customers_df = pd.read_csv(customers_url)

# If your DataFrame doesn't have a timestamp, you can just create one
timestamp = pd.to_datetime("now").timestamp()
claims_df[event_time_name] = timestamp
customers_df[event_time_name] = timestamp
[ ]:
claims_dtypes, customers_dtypes = helpers.get_datatypes()
claims_df = claims_df.astype(claims_dtypes)
customers_df = customers_df.astype(customers_dtypes)

Add data to Feature Store

[ ]:
claims_feature_group, claims_feature_group_exists = helpers.create_feature_group(
    claims_feature_group_name,
    claims_feature_group_description,
    claims_df,
    id_name,
    event_time_name,
    claims_offline_feature_group_bucket,
    sess,
    role,
)

customers_feature_group, customers_feature_group_exists = helpers.create_feature_group(
    customers_feature_group_name,
    customers_feature_group_description,
    customers_df,
    id_name,
    event_time_name,
    customers_offline_feature_group_bucket,
    sess,
    role,
)

Add data to Feature Groups

[ ]:
# Ingest data to Feature Store
feature_store_client = boto3.client("sagemaker-featurestore-runtime", region_name=region)
if not claims_feature_group_exists:
    helpers.ingest_df_to_feature_group(claims_df, claims_feature_group_name, feature_store_client)
if not customers_feature_group_exists:
    helpers.ingest_df_to_feature_group(
        customers_df, customers_feature_group_name, feature_store_client
    )

Get training and test data from Feature Store

Wait for data to be synchronized with offline Feature Store.

[ ]:
time.sleep(900)

Then query feature store to get training and test data.

[ ]:
claims_query = claims_feature_group.athena_query()
customers_query = customers_feature_group.athena_query()

claims_table = claims_query.table_name
customers_table = customers_query.table_name
database_name = customers_query.database

feature_columns = list(set(claims_df.columns) ^ set(customers_df.columns))
feature_columns_string = ", ".join(f'"{c}"' for c in feature_columns)
feature_columns_string = f'"{claims_table}".{id_name} as {id_name}, ' + feature_columns_string

query_string = f"""
SELECT {feature_columns_string}
FROM "{claims_table}" LEFT JOIN "{customers_table}"
ON "{claims_table}".{id_name} = "{customers_table}".{id_name}
"""
[ ]:
claims_query.run(query_string=query_string, output_location=f"s3://{bucket}/{prefix}/query_results")
claims_query.wait()
dataset = claims_query.as_dataframe()

# Create data directory to store local data
data_dir = os.path.join(os.getcwd(), "data")
os.makedirs(data_dir, exist_ok=True)

dataset.to_csv("data/claims_customer.csv")

Save training and test sets locally.

[ ]:
col_order = ["fraud"] + list(dataset.drop(["fraud", "policy_id"], axis=1).columns)
train = dataset.sample(frac=0.80, random_state=0)[col_order]
test = dataset.drop(train.index)[col_order]
[ ]:
train.to_csv("data/train.csv", index=False)
test.to_csv("data/test.csv", index=False)

test = test.reset_index(drop=True)

Upload datasets to S3.

[ ]:
s3_client = boto3.client("s3", region_name=region)
s3_client.upload_file(
    Filename="data/train.csv", Bucket=bucket, Key=f"{prefix}/data/train/train.csv"
)
s3_client.upload_file(Filename="data/test.csv", Bucket=bucket, Key=f"{prefix}/data/test/test.csv")

Train and deploy an XGBoost model

[ ]:
s3_input_train = TrainingInput(s3_input_train_uri, content_type="csv")
s3_input_test = TrainingInput(s3_input_test_uri, content_type="csv")
[ ]:
hyperparameters = {
    "max_depth": "3",
    "eta": "0.2",
    "objective": "binary:logistic",
    "num_round": "100",
}

estimator_parameters = {
    "entry_point": "code/train_deploy.py",
    "instance_type": train_instance_type,
    "instance_count": 1,
    "hyperparameters": hyperparameters,
    "role": role,
    "base_job_name": train_base_job_name,
    "framework_version": "1.0-1",
    "py_version": "py3",
}

estimator = XGBoost(**estimator_parameters)
inputs = {"train": s3_input_train, "test": s3_input_test}

# Train the model if it already hasn't been trained
existing_training_jobs = sess.sagemaker_client.list_training_jobs(
    NameContains=train_base_job_name, MaxResults=30
)["TrainingJobSummaries"]
if not existing_training_jobs:
    estimator.fit(inputs)
# Else fetch the latest training job
else:
    latest_training_job_name = existing_training_jobs[0]["TrainingJobName"]
    estimator = XGBoost.attach(latest_training_job_name)

Create two SageMaker models to deploy behind a single endpoint using SageMaker Production Variants.

[ ]:
model1 = estimator.create_model(entry_point="code/train_deploy.py", role=role, name=model1_name)
model1._create_sagemaker_model(instance_type=deploy_instance_type)

model2 = estimator.create_model(entry_point="code/train_deploy.py", role=role, name=model2_name)
model2._create_sagemaker_model(instance_type=deploy_instance_type)
[ ]:
variant_1 = production_variant(
    model_name=model1_name,
    instance_type=deploy_instance_type,
    initial_instance_count=1,
    variant_name="Variant1",
    initial_weight=1,
)


variant_2 = production_variant(
    model_name=model2_name,
    instance_type=deploy_instance_type,
    initial_instance_count=1,
    variant_name="Variant2",
    initial_weight=1,
)

Setup Model Monitor’s Data Capture for Production Variants.

[ ]:
s3_capture_upload_path = f"s3://{bucket}/{prefix}/model_monitor"

data_capture_config = DataCaptureConfig(
    enable_capture=True, sampling_percentage=100, destination_s3_uri=s3_capture_upload_path
)

data_capture_config_dict = data_capture_config._to_request_dict()

Now create the Production Variant endpoint.

[ ]:
# If not already deployed, deploy the model
existing_endpoints = sess.sagemaker_client.list_endpoints(
    NameContains=endpoint_name, MaxResults=30
)["Endpoints"]
if not existing_endpoints:
    sess.endpoint_from_production_variants(
        name=endpoint_name,
        production_variants=[variant_1, variant_2],
        data_capture_config_dict=data_capture_config_dict,
    )
    predictor = Predictor(
        endpoint_name=endpoint_name,
        sagemaker_session=sess,
        serializer=CSVSerializer(),
        deserializer=CSVDeserializer(),
    )
else:
    predictor = Predictor(
        endpoint_name=endpoint_name,
        sagemaker_session=sess,
        serializer=CSVSerializer(),
        deserializer=CSVDeserializer(),
    )

Create a baseline.

[ ]:
# Baseline data is the training data that we saved as CSV
baseline_data_uri = s3_input_train_uri
baseline_results_uri = f"s3://{bucket}/{prefix}/model_monitor/baseline_output"

my_default_monitor = DefaultModelMonitor(
    role=role,
    instance_count=1,
    instance_type="ml.m5.large",
    volume_size_in_gb=20,
    max_runtime_in_seconds=3600,
)

my_default_monitor.suggest_baseline(
    baseline_dataset=baseline_data_uri,
    dataset_format=DatasetFormat.csv(header=False),
    output_s3_uri=baseline_results_uri,
    wait=True,
)

Create the monitoring job.

[ ]:
baseline_violations_uri = f"s3://{bucket}/{prefix}/model_monitor/violations"

my_default_monitor.create_monitoring_schedule(
    monitor_schedule_name=monitor_schedule_name,
    endpoint_input=endpoint_name,
    output_s3_uri=baseline_violations_uri,
    statistics=my_default_monitor.baseline_statistics(),
    constraints=my_default_monitor.suggested_constraints(),
    schedule_cron_expression=CronExpressionGenerator.hourly(),
    enable_cloudwatch_metrics=True,
)

desc_schedule_result = my_default_monitor.describe_schedule()
print("Schedule status: {}".format(desc_schedule_result["MonitoringScheduleStatus"]))

Test Feature Store in Real-Time Inference

See how you can aggregate data from multiple Feature Groups and use those features as input to a SageMaker endpoint in a low-latency fashion.

[ ]:
def get_prediction(policy_id, featurestore_runtime):
    t0 = datetime.datetime.now()
    customer_record_response = featurestore_runtime.get_record(
        FeatureGroupName="customers-feature-group", RecordIdentifierValueAsString=str(policy_id)
    )

    claims_record_response = featurestore_runtime.get_record(
        FeatureGroupName="claims-feature-group", RecordIdentifierValueAsString=str(policy_id)
    )

    t1 = datetime.datetime.now()

    customer_record = customer_record_response["Record"]
    customer_df = pd.DataFrame(customer_record).set_index("FeatureName")
    claims_record = claims_record_response["Record"]
    claims_df = pd.DataFrame(claims_record).set_index("FeatureName")

    joined_df = pd.concat([claims_df, customer_df]).loc[col_order].drop("fraud")
    payload = ",".join(joined_df["ValueAsString"])
    prediction = float(
        predictor.predict(
            payload, initial_args={"ContentType": "text/csv"}, target_variant="Variant1"
        )[0][0]
    )

    diff = t1 - t0
    minutes, seconds = divmod(diff.total_seconds(), 60)
    timer.append(seconds)

    return prediction


# Instantiate Feature Store Runtime client
boto_session = boto3.Session(region_name=region)
featurestore_runtime = boto_session.client(
    service_name="sagemaker-featurestore-runtime", region_name=region
)

MAX_POLICY_IDS = 100
timer = []
for policy_id in range(1, MAX_POLICY_IDS + 1):
    prediction = get_prediction(policy_id, featurestore_runtime)
    print(f"Probablitity the claim from policy {int(policy_id)} is fraudulent:", prediction)

Get latencies.

[ ]:
timer_array = np.array(timer)
print(
    f"p95: {np.percentile(timer_array,95)}, p99: {np.percentile(timer_array,99)}, mean: {np.mean(timer_array)} for {MAX_POLICY_IDS} distinct Feature Store gets across two Feature Groups"
)

Create ML Gateway with Feature Store

First, write out a Lambda function script. Make sure to replace the ENDPOINT_NAME variable with the name of your deployed SageMaker endpoint.

The Lambda function will check if the policy ID from a user request already exists in Feature Store. If so, it will fetch the features associated with the policy ID from both Feature Groups and feed them as inputs into the SageMaker endpoint.

If there are no features in Feature Store for the given policy ID, then take the raw data from the request, transform it, store it in Feature Store, and return a prediction back to the user.

[ ]:
%%writefile lambda_function.py

import os
import io
import boto3
import json
import pandas as pd
import datetime as datetime
import re

ENDPOINT_NAME = "xgboost-claims-fraud"  # REPLACE WITH SAGEMAKER ENDPOINT NAME
ENDPOINT_NAME = ENDPOINT_NAME.strip()
runtime = boto3.client("runtime.sagemaker")

# Instantiate Feature Store Runtime client
# get current region
region = boto3.Session().region_name
print(f"region : {region}\n")

boto_session = boto3.Session(region_name=region)
featurestore_runtime = boto_session.client(
    service_name="sagemaker-featurestore-runtime", region_name=region
)


def get_payload(policy_id):
    """Get records associated with the policy id from both
    Feature Groups

    Args:
        policy_id: int or str

    Returns:
        str
    """

    col_order = [
        "fraud",
        "driver_relationship_child",
        "num_insurers_past_5_years",
        "incident_severity",
        "driver_relationship_self",
        "authorities_contacted_none",
        "months_as_customer",
        "driver_relationship_na",
        "policy_liability",
        "collision_type_side",
        "collision_type_front",
        "incident_month",
        "num_claims_past_year",
        "customer_gender_male",
        "num_vehicles_involved",
        "customer_education",
        "authorities_contacted_ambulance",
        "police_report_available",
        "incident_dow",
        "vehicle_claim",
        "collision_type_rear",
        "customer_gender_female",
        "incident_day",
        "policy_state_or",
        "customer_age",
        "policy_state_wa",
        "injury_claim",
        "policy_state_id",
        "driver_relationship_spouse",
        "policy_deductable",
        "num_injuries",
        "collision_type_na",
        "driver_relationship_other",
        "incident_hour",
        "incident_type_theft",
        "incident_type_breakin",
        "num_witnesses",
        "policy_state_ca",
        "policy_state_nv",
        "incident_type_collision",
        "auto_year",
        "authorities_contacted_police",
        "policy_state_az",
        "policy_annual_premium",
        "total_claim_amount",
        "authorities_contacted_fire",
    ]
    t0 = datetime.datetime.now()
    customer_record_response = featurestore_runtime.get_record(
        FeatureGroupName="customers-feature-group", RecordIdentifierValueAsString=str(policy_id)
    )
    claims_record_response = featurestore_runtime.get_record(
        FeatureGroupName="claims-feature-group", RecordIdentifierValueAsString=str(policy_id)
    )
    t1 = datetime.datetime.now()
    customer_record = customer_record_response["Record"]
    customer_df = pd.DataFrame(customer_record).set_index("FeatureName")
    claims_record = claims_record_response["Record"]
    claims_df = pd.DataFrame(claims_record).set_index("FeatureName")
    joined_df = pd.concat([claims_df, customer_df]).loc[col_order].drop("fraud")
    payload = ",".join(joined_df["ValueAsString"])
    return payload


def response(message, status_code):
    return {
        "statusCode": str(status_code),
        "body": json.dumps(message),
        "headers": {"Content-Type": "application/json", "Access-Control-Allow-Origin": "*"},
    }


def one_hot_encoder(df: pd.DataFrame, input_column: str, categories: list) -> None:
    """A one hot encoder similiar to the one in Data Wrangler.

    Args:
        df: A Pandas DataFrame.
        input_column: The name of the column which contains the categorical values.
        categories: The list of categorical values which was available during training.

    Returns:
        None: The DataFrame is updated in place with the encoded features.

    """

    # NaN types are converted to literal `na` in Data Wrangler during one-hot encoding
    if "na" in categories:
        df[input_column].fillna("na", inplace=True)
    for c in categories:
        df[f"{input_column}_{c}"] = 0
    for idx, val in df[input_column].iteritems():
        df.at[idx, f"{input_column}_{val}"] = 1
    df.drop(input_column, axis=1, inplace=True)


def transform_claims_data(claims_data: dict) -> pd.DataFrame:
    """Transforms the inbound claims data to the feature store format.

    Args:
        claims_data: A dictionary containing the claims data.

    Returns:
        pd.DataFrame: A Pandas DataFrame containing the processed claims data.
    """

    claims_df = pd.DataFrame.from_dict(claims_data)

    # (3) convert cat columns to lowercase
    claims_df = claims_df.applymap(lambda s: s.lower() if type(s) == str else s)

    # (4-6) format string
    invalid_char = re.compile("[-@#$%^&*()_+=/\`~{}|<>?]")
    claims_df["driver_relationship"].replace(invalid_char, " ", regex=True, inplace=True)
    claims_df["collision_type"].replace(invalid_char, " ", regex=True, inplace=True)
    claims_df["incident_type"].replace(invalid_char, " ", regex=True, inplace=True)

    # (7-10) one hot encode
    one_hot_encoder(claims_df, "driver_relationship", ["spouse", "self", "child", "na", "other"])
    one_hot_encoder(claims_df, "incident_type", ["collision", "breakin", "theft"])
    one_hot_encoder(claims_df, "collision_type", ["front", "rear", "side", "na"])
    one_hot_encoder(claims_df, "authorities_contacted", ["none", "police", "ambulance", "fire"])

    # (11-12) ordinal encode
    claims_df["incident_severity"] = claims_df["incident_severity"].replace(
        {"minor": 0, "major": 1, "totaled": 2, "na": 3}
    )
    claims_df["police_report_available"] = claims_df["police_report_available"].replace(
        {"no": 0, "yes": 1, "na": 2}
    )

    # (13) create event_time
    claims_df["event_time"] = pd.to_datetime("now").timestamp()

    # NOTE: remaining steps in Flow file involve casting encoded columns from Float to Long, which is not
    # necessary here.

    return claims_df


def transform_customers_data(customers_data: dict) -> pd.DataFrame:
    """Transforms the inbound customers data to the feature store format.

    Args:
        customers_data: A dictionary containing the customers data.

    Returns:
        pd.DataFrame: A Pandas DataFrame containing the processed customers data.
    """
    customers_df = pd.DataFrame.from_dict(customers_data)

    # (3) convert cat columns to lowercase
    customers_df = customers_df.applymap(lambda s: s.lower() if type(s) == str else s)

    # (4) drop customer_zip
    customers_df.drop("customer_zip", axis=1, inplace=True)

    # (5-6) one hot encode
    one_hot_encoder(customers_df, "customer_gender", ["unkown", "male", "female", "other"])
    one_hot_encoder(customers_df, "policy_state", ["wa", "ca", "az", "or", "nv", "id"])

    # (7-8) ordinal encode
    customers_df["customer_education"] = customers_df["customer_education"].replace(
        {
            "below high school": 0,
            "high school": 1,
            "associate": 2,
            "bachelor": 3,
            "advanced degree": 4,
        }
    )
    customers_df["policy_liability"] = customers_df["policy_liability"].replace(
        {"15/30": 0, "25/50": 1, "30/60": 2, "100/200": 3}
    )

    # NOTE: steps 9-18 in Flow file involve casting encoded columns from Float to Long, which is not
    # necessary here.

    # (19) create event_time
    customers_df["event_time"] = pd.to_datetime("now").timestamp()

    # (20-21) drop unused columns
    customers_df.drop("customer_gender_unkown", axis=1, inplace=True)
    customers_df.drop("customer_gender_other", axis=1, inplace=True)

    return customers_df


def ingest_df_to_feature_group(df, feature_group_name):
    """Ingests data from a DataFrame into a Feature Groups

    Args:
        df: pd.DataFrame
        feature_group_name: str

    Returns:
        None: Data is already ingested into Feature Group
    """
    success, fail = 0, 0
    for row_num, row_series in df.astype(str).iterrows():
        record = []
        for key, value in row_series.to_dict().items():
            record.append({"FeatureName": key, "ValueAsString": str(value)})
        print(record)
        response = featurestore_runtime.put_record(
            FeatureGroupName=feature_group_name, Record=record
        )
        if response["ResponseMetadata"]["HTTPStatusCode"] == 200:
            success += 1
        else:
            fail += 1
    print(f"Success = {success}")
    print(f"Fail = {fail}")


def get_prediction(policy, target_variant):
    """Get records from Feature Groups and invoke SageMaker endpoint

    Args:
        policy: int or str

    Returns:
        dict to be used as a json response
    """
    feature_record = get_payload(policy)
    sm_response = runtime.invoke_endpoint(
        EndpointName=ENDPOINT_NAME,
        ContentType="text/csv",
        Accept="application/json",
        Body=feature_record,
        TargetVariant=target_variant,
    )
    result = json.loads(sm_response["Body"].read().decode())
    pred = result[0]
    return response({"prediction": pred}, 200)


def lambda_handler(event, context):
    print("Received event: " + json.dumps(event, indent=2))

    # If request came from API Gateway
    try:
        data = json.loads(event["body"])

    # Otherwise it's just a test case
    except:
        data = json.loads(json.dumps(event))

    policy = data["claim"]["policy_id"]["0"]
    target_variant = data["variant"]

    try:
        return get_prediction(policy, target_variant)
    except:
        # Get raw data from request
        claim = data["claim"]
        customer = data["customer"]
        # Transform data
        processed_claims_df = transform_claims_data(claim)
        processed_customers_df = transform_customers_data(customer)
        # Ingest newly processed records into Feature Groups
        ingest_df_to_feature_group(processed_claims_df, "claims-feature-group")
        ingest_df_to_feature_group(processed_customers_df, "customers-feature-group")
        # Return prediction
        return get_prediction(policy, target_variant)

Upload the Lambda code to S3.

[ ]:
shutil.make_archive("function", "zip", ".", "lambda_function.py")
s3_bucket_uri = f"s3://{bucket}"

!aws s3 cp function.zip {s3_bucket_uri}

Use the helpers library to deploy what we call an ML Gateway pattern. This will spin up an API Gateway endpoint that’s attached to a Lambda function with code you’ve seen above. This is the gateway that ties together the SageMaker Feature Store and a model deployed as a SageMaker endpoint.

To deploy this ML Gateway pattern, you need to add the following permissions to your SageMaker execution role:

{
    "Effect": "Allow",
    "Action": [
        "apigateway:*"
    ],
    "Resource": [
        "*"
    ]
},
{
    "Effect": "Allow",
    "Action": [
        "lambda:GetLayerVersion"
    ],
    "Resource": [
        "*"
    ]
}

Alternatively, you can add the managed AWSLambdaFullAccess and AmazonAPIGatewayAdministrator policies to your SageMaker execution role but keep in mind these particular managed policies are overly permissive and should be reviewed for least privileges before production.

[ ]:
api_gateway_url = helpers.deploy_ml_gateway_pattern(endpoint_name, region, bucket)

With the above API Gateway URL, we can call our endpoint with Feature Store.

[ ]:
api_gateway_url
[ ]:
# Endpoint
url = api_gateway_url

# User request data
input_data = {
    "variant": "Variant1",
    "claim": {
        "policy_id": {"0": "999999999"},
        "driver_relationship": {"0": "Spouse"},
        "incident_type": {"0": "Collision"},
        "collision_type": {"0": "Front"},
        "incident_severity": {"0": "Minor"},
        "authorities_contacted": {"0": "None"},
        "num_vehicles_involved": {"0": 2},
        "num_injuries": {"0": 0},
        "num_witnesses": {"0": 0},
        "police_report_available": {"0": "No"},
        "injury_claim": {"0": 71600},
        "vehicle_claim": {"0": 8913.6687631788},
        "total_claim_amount": {"0": 80513.6687631788},
        "incident_month": {"0": 3},
        "incident_day": {"0": 17},
        "incident_dow": {"0": 6},
        "incident_hour": {"0": 8},
        "fraud": {"0": 0},
    },
    "customer": {
        "policy_id": {"0": "999999999"},
        "customer_age": {"0": 54},
        "months_as_customer": {"0": 94},
        "num_claims_past_year": {"0": 0},
        "num_insurers_past_5_years": {"0": 1},
        "policy_state": {"0": "WA"},
        "policy_deductable": {"0": 750},
        "policy_annual_premium": {"0": 3000},
        "policy_liability": {"0": "25/50"},
        "customer_zip": {"0": 99207},
        "customer_gender": {"0": "Unkown"},
        "customer_education": {"0": "Associate"},
        "auto_year": {"0": 2006},
    },
}

# Hit endpoint
r = requests.post(url, json=input_data)

# Print response
print(r.json())

Clean Up

[ ]:
def clean_up():
    # Delete the online Feature Groups
    claims_feature_group.delete()
    customers_feature_group.delete()

    # Delete the offline Feature Groups
    !aws s3 rm {claims_offline_feature_group_bucket} --recursive
    !aws s3 rm {customers_offline_feature_group_bucket} --recursive
    !aws s3 rm {prefix} --recursive

    # Delete training and test data
    s3_prefix_uri = f"s3://{bucket}/{prefix}"
    !aws s3 rm {s3_prefix_uri} --recursive

    # Delete model monitor
    !aws sagemaker delete-monitoring-schedule --monitoring-schedule-name {monitor_schedule_name}

Uncomment the following cell to clean up the Feature Groups, the offline Featrue Group S3 buckets, and the Model Monitor schedule.

[ ]:
#clean_up()