Fraud Detection for Automobile Claims: Mitigate Bias, Train, Register, and Deploy Unbiased Model
This notebook’s CI test result for us-west-2 is as follows. CI test results in other regions can be found at the end of the notebook.
Background
This notebook is the fourth part of a series of notebooks that will demonstrate how to prepare, train, and deploy a model that detects fradulent auto claims. In this notebook, we will describe how to detect bias using Clarify, mitigate it with SMOTE, train another model, put it in the Model Registry along with all the Lineage of the Artifacts created along the way: data, code and model metadata. You can choose to run this notebook by itself or in sequence with the other notebooks listed below. Please see the README.md for more information about this use case implemented by this series of notebooks.
Fraud Detection for Automobile Claims: Data Preparation, Process, and Store Features
Fraud Detection for Automobile Claims: Train, Check Bias, Tune, Record Lineage, and Register a Model
`Fraud Detection for Automobile Claims: Mitigate Bias, Train, Register, and Deploy Unbiased Model <./3-mitigate-bias-train-model2-registry-e2e.ipynb>`__
Contents
Install required and/or update third-party libraries
[3]:
!python -m pip install -Uq pip
!python -m pip install -q awswrangler imbalanced-learn==0.7.0 sagemaker==2.23.0 boto3==1.17.70
WARNING: Running pip as the 'root' user can result in broken permissions and conflicting behaviour with the system package manager. It is recommended to use a virtual environment instead: https://pip.pypa.io/warnings/venv
ERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
daal4py 2021.3.0 requires daal==2021.2.3, which is not installed.
awscli 1.27.153 requires botocore==1.29.153, but you have botocore 1.20.112 which is incompatible.
awscli 1.27.153 requires s3transfer<0.7.0,>=0.6.0, but you have s3transfer 0.4.2 which is incompatible.
numba 0.54.1 requires numpy<1.21,>=1.17, but you have numpy 1.22.4 which is incompatible.
sagemaker-datawrangler 0.4.3 requires sagemaker-data-insights==0.4.0, but you have sagemaker-data-insights 0.3.3 which is incompatible.
sagemaker-studio-analytics-extension 0.0.19 requires boto3<2.0,>=1.26.49, but you have boto3 1.17.70 which is incompatible.
WARNING: Running pip as the 'root' user can result in broken permissions and conflicting behaviour with the system package manager. It is recommended to use a virtual environment instead: https://pip.pypa.io/warnings/venv
Import libraries
[5]:
import json
import time
import boto3
import sagemaker
import numpy as np
import pandas as pd
import awswrangler as wr
import matplotlib.pyplot as plt
from imblearn.over_sampling import SMOTE
from sagemaker.xgboost.estimator import XGBoost
from model_package_src.inference_specification import InferenceSpecification
%matplotlib inline
Set region, boto3 and SageMaker SDK variables
[6]:
# You can change this to a region of your choice
import sagemaker
region = sagemaker.Session().boto_region_name
print("Using AWS Region: {}".format(region))
Using AWS Region: us-east-1
[7]:
boto3.setup_default_session(region_name=region)
boto_session = boto3.Session(region_name=region)
s3_client = boto3.client("s3", region_name=region)
sagemaker_boto_client = boto_session.client("sagemaker")
sagemaker_session = sagemaker.Session(
boto_session=boto_session, sagemaker_client=sagemaker_boto_client
)
sagemaker_role = sagemaker.get_execution_role()
account_id = boto3.client("sts").get_caller_identity()["Account"]
[18]:
# variables used for parameterizing the notebook run
bucket = sagemaker_session.default_bucket()
prefix = "fraud-detect-demo"
claims_fg_name = f"{prefix}-claims"
customers_fg_name = f"{prefix}-customers"
model_2_name = f"{prefix}-xgboost-post-smote"
train_data_upsampled_s3_path = f"s3://{bucket}/{prefix}/data/train/upsampled/train.csv"
bias_report_2_output_path = f"s3://{bucket}/{prefix}/clarify-output/bias-2"
explainability_output_path = f"s3://{bucket}/{prefix}/clarify-output/explainability"
train_instance_count = 1
train_instance_type = "ml.m4.xlarge"
claify_instance_count = 1
clairfy_instance_type = "ml.c5.xlarge"
[19]:
train_data_upsampled_s3_path
[19]:
's3://sagemaker-us-east-1-527468462420/fraud-detect-demo/data/train/upsampled/train.csv'
## Architecture: Train, Check Bias, Tune, Record Lineage, Register Model
## Develop an Unbiased Model
In this second model, you will fix the gender imbalance in the dataset using SMOTE and train another model using XGBoost. This model will also be saved to our registry and eventually approved for deployment.
[10]:
train = pd.read_csv("data/train.csv")
test = pd.read_csv("data/test.csv")
Resolve class imbalance using SMOTE
To handle the imbalance, we can over-sample (i.e. upsample) the minority class using SMOTE (Synthetic Minority Over-sampling Technique). After installing the imbalanced-learn module, if you receive an ImportError when importing SMOTE, then try restarting the kernel.
Gender balance before SMOTE
[11]:
gender = train["customer_gender_female"]
gender.value_counts()
[11]:
0 10031
1 5969
Name: customer_gender_female, dtype: int64
Gender balance after SMOTE
[12]:
sm = SMOTE(random_state=42)
train_data_upsampled, gender_res = sm.fit_resample(train, gender)
train_data_upsampled["customer_gender_female"].value_counts()
[12]:
1 10031
0 10031
Name: customer_gender_female, dtype: int64
Set the hyperparameters
These are the parameters which will be sent to our training script in order to train the model. Although they are all defined as “hyperparameters” here, they can encompass XGBoost’s Learning Task Parameters, Tree Booster Parameters, or any other parameters you’d like to configure for XGBoost.
[13]:
hyperparameters = {
"max_depth": "3",
"eta": "0.2",
"objective": "binary:logistic",
"num_round": "100",
}
Train new model
[14]:
train_data_upsampled.to_csv("data/upsampled_train.csv", index=False)
s3_client.upload_file(
Filename="data/upsampled_train.csv",
Bucket=bucket,
Key=f"{prefix}/data/train/upsampled/train.csv",
)
[15]:
xgb_estimator = XGBoost(
entry_point="xgboost_starter_script.py",
hyperparameters=hyperparameters,
role=sagemaker_role,
instance_count=train_instance_count,
instance_type=train_instance_type,
framework_version="1.0-1",
)
[16]:
if "training_job_2_name" not in locals():
xgb_estimator.fit(inputs={"train": train_data_upsampled_s3_path})
training_job_2_name = xgb_estimator.latest_training_job.job_name
else:
print(f"Using previous training job: {training_job_2_name}")
---------------------------------------------------------------------------
FileNotFoundError Traceback (most recent call last)
Cell In[16], line 3
1 if "training_job_2_name" not in locals():
----> 3 xgb_estimator.fit(inputs={"train": train_data_upsampled_s3_path})
4 training_job_2_name = xgb_estimator.latest_training_job.job_name
6 else:
File /opt/conda/lib/python3.8/site-packages/sagemaker/estimator.py:654, in EstimatorBase.fit(self, inputs, wait, logs, job_name, experiment_config)
613 def fit(self, inputs=None, wait=True, logs="All", job_name=None, experiment_config=None):
614 """Train a model using the input training dataset.
615
616 The API calls the Amazon SageMaker CreateTrainingJob API to start
(...)
652
653 """
--> 654 self._prepare_for_training(job_name=job_name)
656 self.latest_training_job = _TrainingJob.start_new(self, inputs, experiment_config)
657 self.jobs.append(self.latest_training_job)
File /opt/conda/lib/python3.8/site-packages/sagemaker/estimator.py:2186, in Framework._prepare_for_training(self, job_name)
2184 self.code_uri = self.uploaded_code.s3_prefix
2185 else:
-> 2186 self.uploaded_code = self._stage_user_code_in_s3()
2187 code_dir = self.uploaded_code.s3_prefix
2188 script = self.uploaded_code.script_name
File /opt/conda/lib/python3.8/site-packages/sagemaker/estimator.py:2234, in Framework._stage_user_code_in_s3(self)
2231 output_bucket, _ = parse_s3_url(self.output_path)
2232 kms_key = self.output_kms_key if code_bucket == output_bucket else None
-> 2234 return tar_and_upload_dir(
2235 session=self.sagemaker_session.boto_session,
2236 bucket=code_bucket,
2237 s3_key_prefix=code_s3_prefix,
2238 script=self.entry_point,
2239 directory=self.source_dir,
2240 dependencies=self.dependencies,
2241 kms_key=kms_key,
2242 s3_resource=self.sagemaker_session.s3_resource,
2243 )
File /opt/conda/lib/python3.8/site-packages/sagemaker/fw_utils.py:225, in tar_and_upload_dir(session, bucket, s3_key_prefix, script, directory, dependencies, kms_key, s3_resource)
223 try:
224 source_files = _list_files_to_compress(script, directory) + dependencies
--> 225 tar_file = sagemaker.utils.create_tar_file(
226 source_files, os.path.join(tmp, _TAR_SOURCE_FILENAME)
227 )
229 if kms_key:
230 extra_args = {"ServerSideEncryption": "aws:kms", "SSEKMSKeyId": kms_key}
File /opt/conda/lib/python3.8/site-packages/sagemaker/utils.py:337, in create_tar_file(source_files, target)
334 with tarfile.open(filename, mode="w:gz") as t:
335 for sf in source_files:
336 # Add all files from the directory into the root of the directory structure of the tar
--> 337 t.add(sf, arcname=os.path.basename(sf))
338 return filename
File /opt/conda/lib/python3.8/tarfile.py:1955, in TarFile.add(self, name, arcname, recursive, filter)
1952 self._dbg(1, name)
1954 # Create a TarInfo object from the file.
-> 1955 tarinfo = self.gettarinfo(name, arcname)
1957 if tarinfo is None:
1958 self._dbg(1, "tarfile: Unsupported type %r" % name)
File /opt/conda/lib/python3.8/tarfile.py:1834, in TarFile.gettarinfo(self, name, arcname, fileobj)
1832 if fileobj is None:
1833 if not self.dereference:
-> 1834 statres = os.lstat(name)
1835 else:
1836 statres = os.stat(name)
FileNotFoundError: [Errno 2] No such file or directory: 'xgboost_starter_script.py'
Register artifacts
[ ]:
training_job_2_info = sagemaker_boto_client.describe_training_job(
TrainingJobName=training_job_2_name
)
Code artifact
[ ]:
# return any existing artifact which match the our training job's code arn
code_s3_uri = training_job_2_info["HyperParameters"]["sagemaker_submit_directory"]
list_response = list(
sagemaker.lineage.artifact.Artifact.list(
source_uri=code_s3_uri, sagemaker_session=sagemaker_session
)
)
# use existing arifact if it's already been created, otherwise create a new artifact
if list_response:
code_artifact = list_response[0]
print(f"Using existing artifact: {code_artifact.artifact_arn}")
else:
code_artifact = sagemaker.lineage.artifact.Artifact.create(
artifact_name="TrainingScript",
source_uri=code_s3_uri,
artifact_type="Code",
sagemaker_session=sagemaker_session,
)
print(f"Create artifact {code_artifact.artifact_arn}: SUCCESSFUL")
Training data artifact
[17]:
training_data_s3_uri = training_job_2_info["InputDataConfig"][0]["DataSource"]["S3DataSource"][
"S3Uri"
]
list_response = list(
sagemaker.lineage.artifact.Artifact.list(
source_uri=training_data_s3_uri, sagemaker_session=sagemaker_session
)
)
if list_response:
training_data_artifact = list_response[0]
print(f"Using existing artifact: {training_data_artifact.artifact_arn}")
else:
training_data_artifact = sagemaker.lineage.artifact.Artifact.create(
artifact_name="TrainingData",
source_uri=training_data_s3_uri,
artifact_type="Dataset",
sagemaker_session=sagemaker_session,
)
print(f"Create artifact {training_data_artifact.artifact_arn}: SUCCESSFUL")
---------------------------------------------------------------------------
NameError Traceback (most recent call last)
Cell In[17], line 1
----> 1 training_data_s3_uri = training_job_2_info["InputDataConfig"][0]["DataSource"]["S3DataSource"][
2 "S3Uri"
3 ]
5 list_response = list(
6 sagemaker.lineage.artifact.Artifact.list(
7 source_uri=training_data_s3_uri, sagemaker_session=sagemaker_session
8 )
9 )
11 if list_response:
NameError: name 'training_job_2_info' is not defined
Model artifact
[ ]:
trained_model_s3_uri = training_job_2_info["ModelArtifacts"]["S3ModelArtifacts"]
list_response = list(
sagemaker.lineage.artifact.Artifact.list(
source_uri=trained_model_s3_uri, sagemaker_session=sagemaker_session
)
)
if list_response:
model_artifact = list_response[0]
print(f"Using existing artifact: {model_artifact.artifact_arn}")
else:
model_artifact = sagemaker.lineage.artifact.Artifact.create(
artifact_name="TrainedModel",
source_uri=trained_model_s3_uri,
artifact_type="Model",
sagemaker_session=sagemaker_session,
)
print(f"Create artifact {model_artifact.artifact_arn}: SUCCESSFUL")
Set artifact associations
[ ]:
trial_component = sagemaker_boto_client.describe_trial_component(
TrialComponentName=training_job_2_name + "-aws-training-job"
)
trial_component_arn = trial_component["TrialComponentArn"]
Input artifacts
[ ]:
input_artifacts = [code_artifact, training_data_artifact]
for a in input_artifacts:
try:
sagemaker.lineage.association.Association.create(
source_arn=a.artifact_arn,
destination_arn=trial_component_arn,
association_type="ContributedTo",
sagemaker_session=sagemaker_session,
)
print(f"Associate {trial_component_arn} and {a.artifact_arn}: SUCCEESFUL\n")
except:
print(f"Association already exists between {trial_component_arn} and {a.artifact_arn}.\n")
Output artifacts
[ ]:
output_artifacts = [model_artifact]
for artifact_arn in output_artifacts:
try:
sagemaker.lineage.association.Association.create(
source_arn=a.artifact_arn,
destination_arn=trial_component_arn,
association_type="Produced",
sagemaker_session=sagemaker_session,
)
print(f"Associate {trial_component_arn} and {a.artifact_arn}: SUCCEESFUL\n")
except:
print(f"Association already exists between {trial_component_arn} and {a.artifact_arn}.\n")
## Analyze Model for Bias and Explainability
Amazon SageMaker Clarify provides tools to help explain how machine learning (ML) models make predictions. These tools can help ML modelers and developers and other internal stakeholders understand model characteristics as a whole prior to deployment and to debug predictions provided by the model after it’s deployed. Transparency about how ML models arrive at their predictions is also critical to consumers and regulators who need to trust the model predictions if they are going to accept the decisions based on them. SageMaker Clarify uses a model-agnostic feature attribution approach, which you can used to understand why a model made a prediction after training and to provide per-instance explanation during inference. The implementation includes a scalable and efficient implementation of SHAP (see paper), based on the concept of a Shapley value from the field of cooperative game theory that assigns each feature an importance value for a particular prediction.
Create model from estimator
[ ]:
model_matches = sagemaker_boto_client.list_models(NameContains=model_2_name)['Models']
if not model_matches:
model_2 = sagemaker_session.create_model_from_job(
name=model_2_name,
training_job_name=training_job_2_info['TrainingJobName'],
role=sagemaker_role,
image_uri=training_job_2_info['AlgorithmSpecification']['TrainingImage'])
%store model_2_name
else:
print(f"Model {model_2_name} already exists.")
Check for data set bias and model bias
With SageMaker, we can check for pre-training and post-training bias. Pre-training metrics show pre-existing bias in that data, while post-training metrics show bias in the predictions from the model. Using the SageMaker SDK, we can specify which groups we want to check bias across and which metrics we’d like to show.
To run the full Clarify job, you must un-comment the code in the cell below. Running the job will take ~15 minutes. If you wish to save time, you can view the results in the next cell after which loads a pre-generated output if no bias job was run.
[ ]:
clarify_processor = sagemaker.clarify.SageMakerClarifyProcessor(
role=sagemaker_role,
instance_count=1,
instance_type="ml.c4.xlarge",
sagemaker_session=sagemaker_session,
)
bias_data_config = sagemaker.clarify.DataConfig(
s3_data_input_path=train_data_upsampled_s3_path,
s3_output_path=bias_report_2_output_path,
label="fraud",
headers=train.columns.to_list(),
dataset_type="text/csv",
)
model_config = sagemaker.clarify.ModelConfig(
model_name=model_2_name,
instance_type=train_instance_type,
instance_count=1,
accept_type="text/csv",
)
predictions_config = sagemaker.clarify.ModelPredictedLabelConfig(probability_threshold=0.5)
bias_config = sagemaker.clarify.BiasConfig(
label_values_or_threshold=[0],
facet_name="customer_gender_female",
facet_values_or_threshold=[1],
)
# # un-comment the code below to run the whole job
# if 'clarify_bias_job_2_name' not in locals():
# clarify_processor.run_bias(
# data_config=bias_data_config,
# bias_config=bias_config,
# model_config=model_config,
# model_predicted_label_config=predictions_config,
# pre_training_methods='all',
# post_training_methods='all')
# clarify_bias_job_2_name = clarify_processor.latest_job.name
# %store clarify_bias_job_2_name
# else:
# print(f'Clarify job {clarify_bias_job_2_name} has already run successfully.')
## View Results of Clarify Job
Running Clarify on your dataset or model can take ~15 minutes. If you don’t have time to run the job, you can view the pre-generated results included with this demo. Otherwise, you can run the job by un-commenting the code in the cell above.
[ ]:
if "clarify_bias_job_2_name" in locals():
s3_client.download_file(
Bucket=bucket,
Key=f"{prefix}/clarify-output/bias-2/analysis.json",
Filename="clarify_output/bias_2/analysis.json",
)
print(f"Downloaded analysis from previous Clarify job: {clarify_bias_job_2_name}\n")
else:
print(f"Loading pre-generated analysis file...\n")
with open("clarify_output/bias_1/analysis.json", "r") as f:
bias_analysis = json.load(f)
results = bias_analysis["pre_training_bias_metrics"]["facets"]["customer_gender_female"][0][
"metrics"
][1]
print(json.dumps(results, indent=4))
with open("clarify_output/bias_2/analysis.json", "r") as f:
bias_analysis = json.load(f)
results = bias_analysis["pre_training_bias_metrics"]["facets"]["customer_gender_female"][0][
"metrics"
][1]
print(json.dumps(results, indent=4))
## Configure and Run Explainability Job
To run the full Clarify job, you must un-comment the code in the cell below. Running the job will take ~15 minutes. If you wish to save time, you can view the results in the next cell after which loads a pre-generated output if no explainability job was run.
[ ]:
model_config = sagemaker.clarify.ModelConfig(
model_name=model_2_name,
instance_type=train_instance_type,
instance_count=1,
accept_type="text/csv",
)
shap_config = sagemaker.clarify.SHAPConfig(
baseline=[train.median().values[1:].tolist()], num_samples=100, agg_method="mean_abs"
)
explainability_data_config = sagemaker.clarify.DataConfig(
s3_data_input_path=train_data_upsampled_s3_path,
s3_output_path=explainability_output_path,
label="fraud",
headers=train.columns.to_list(),
dataset_type="text/csv",
)
# un-comment the code below to run the whole job
# if "clarify_expl_job_name" not in locals():
# clarify_processor.run_explainability(
# data_config=explainability_data_config,
# model_config=model_config,
# explainability_config=shap_config)
# clarify_expl_job_name = clarify_processor.latest_job.name
# %store clarify_expl_job_name
# else:
# print(f'Clarify job {clarify_expl_job_name} has already run successfully.')
View Clarify explainability results (shortcut)
Running Clarify on your dataset or model can take ~15 minutes. If you don’t have time to run the job, you can view the pre-generated results included with this demo. Otherwise, you can run the job by un-commenting the code in the cell above.
[ ]:
if "clarify_expl_job_name" in locals():
s3_client.download_file(
Bucket=bucket,
Key=f"{prefix}/clarify-output/explainability/analysis.json",
Filename="clarify_output/explainability/analysis.json",
)
print(f"Downloaded analysis from previous Clarify job: {clarify_expl_job_name}\n")
else:
print(f"Loading pre-generated analysis file...\n")
with open("clarify_output/explainability/analysis.json", "r") as f:
analysis_result = json.load(f)
shap_values = pd.DataFrame(analysis_result["explanations"]["kernel_shap"]["label0"])
importances = shap_values["global_shap_values"].sort_values(ascending=False)
fig, ax = plt.subplots()
n = 5
y_pos = np.arange(n)
importance_scores = importances.values[:n]
y_label = importances.index[:n]
ax.barh(y_pos, importance_scores, align="center")
ax.set_yticks(y_pos)
ax.set_yticklabels(y_label)
ax.invert_yaxis()
ax.set_xlabel("SHAP Value (impact on model output)");
To see the autogenerated SageMaker Clarify report, run the following code and use the output link to open the report.
[ ]:
from IPython.display import FileLink, FileLinks
display(
"Click link below to view the SageMaker Clarify report", FileLink("clarify_output/report.pdf")
)
What is SHAP?
SHAP is the method used for calculating explanations in this solution. Unlike other feature attribution methods, such as single feature permutation, SHAP tries to disentangle the effect of a single feature by looking at all possible combinations of features.
SHAP (Lundberg et al. 2017) stands for SHapley Additive exPlanations. ‘Shapley’ relates to a game theoretic concept called Shapley values that is used to create the explanations. A Shapley value describes the marginal contribution of each ‘player’ when considering all possible ‘coalitions’. Using this in a machine learning context, a Shapley value describes the marginal contribution of each feature when considering all possible sets of features. ‘Additive’ relates to the fact that these Shapley values can be summed together to give the final model prediction.
As an example, we might start off with a baseline credit default risk of 10%. Given a set of features, we can calculate the Shapley value for each feature. Summing together all the Shapley values, we might obtain a cumulative value of +30%. Given the same set of features, we therefore expect our model to return a credit default risk of 40% (i.e. 10% + 30%).
## Create Model Package for the Trained Model
[ ]:
model_metrics_report = {"binary_classification_metrics": {}}
for metric in training_job_2_info["FinalMetricDataList"]:
stat = {metric["MetricName"]: {"value": metric["Value"], "standard_deviation": "NaN"}}
model_metrics_report["binary_classification_metrics"].update(stat)
with open("training_metrics.json", "w") as f:
json.dump(model_metrics_report, f)
metrics_s3_key = (
f"{prefix}/training_jobs/{training_job_2_info['TrainingJobName']}/training_metrics.json"
)
s3_client.upload_file(Filename="training_metrics.json", Bucket=bucket, Key=metrics_s3_key)
[ ]:
mp_inference_spec = InferenceSpecification().get_inference_specification_dict(
ecr_image=training_job_2_info["AlgorithmSpecification"]["TrainingImage"],
supports_gpu=False,
supported_content_types=["text/csv"],
supported_mime_types=["text/csv"],
)
mp_inference_spec["InferenceSpecification"]["Containers"][0]["ModelDataUrl"] = training_job_2_info[
"ModelArtifacts"
]["S3ModelArtifacts"]
[ ]:
model_metrics = {
"ModelQuality": {
"Statistics": {
"ContentType": "application/json",
"S3Uri": f"s3://{bucket}/{metrics_s3_key}",
}
},
"Bias": {
"Report": {
"ContentType": "application/json",
"S3Uri": f"{explainability_output_path}/analysis.json",
}
},
}
[ ]:
mpg_name = prefix
mp_input_dict = {
"ModelPackageGroupName": mpg_name,
"ModelPackageDescription": "XGBoost classifier to detect insurance fraud with SMOTE.",
"ModelApprovalStatus": "PendingManualApproval",
"ModelMetrics": model_metrics,
}
mp_input_dict.update(mp_inference_spec)
mp2_response = sagemaker_boto_client.create_model_package(**mp_input_dict)
mp2_arn = mp2_response["ModelPackageArn"]
[ ]:
mp_info = sagemaker_boto_client.describe_model_package(
ModelPackageName=mp2_response["ModelPackageArn"]
)
mp_status = mp_info["ModelPackageStatus"]
while mp_status not in ["Completed", "Failed"]:
time.sleep(5)
mp_info = sagemaker_boto_client.describe_model_package(
ModelPackageName=mp2_response["ModelPackageArn"]
)
mp_status = mp_info["ModelPackageStatus"]
print(f"model package status: {mp_status}")
print(f"model package status: {mp_status}")
View both models in the registry
[ ]:
sagemaker_boto_client.list_model_packages(ModelPackageGroupName=mpg_name)["ModelPackageSummaryList"]
## Architecture: Deploy and Serve Model
Now that we have trained a model, we can deploy and serve it. The follwoing picture shows the architecture for doing so.
[ ]:
# variables used for parameterizing the notebook run
endpoint_name = f"{model_2_name}-endpoint"
endpoint_instance_count = 1
endpoint_instance_type = "ml.m4.xlarge"
predictor_instance_count = 1
predictor_instance_type = "ml.c5.xlarge"
batch_transform_instance_count = 1
batch_transform_instance_type = "ml.c5.xlarge"
## Deploy an Approved Model and Make a Prediction
In the real-life MLOps lifecycle, a model package gets approved after evaluation by data scientists, subject matter experts and auditors.
[ ]:
second_model_package = sagemaker_boto_client.list_model_packages(ModelPackageGroupName=mpg_name)[
"ModelPackageSummaryList"
][0]
model_package_update = {
"ModelPackageArn": second_model_package["ModelPackageArn"],
"ModelApprovalStatus": "Approved",
}
update_response = sagemaker_boto_client.update_model_package(**model_package_update)
Deploy the endpoint. This might take about 8minutes.
[ ]:
primary_container = {"ModelPackageName": second_model_package["ModelPackageArn"]}
endpoint_config_name = f"{model_2_name}-endpoint-config"
existing_configs = len(
sagemaker_boto_client.list_endpoint_configs(NameContains=endpoint_config_name, MaxResults=30)[
"EndpointConfigs"
]
)
if existing_configs == 0:
create_ep_config_response = sagemaker_boto_client.create_endpoint_config(
EndpointConfigName=endpoint_config_name,
ProductionVariants=[
{
"InstanceType": endpoint_instance_type,
"InitialVariantWeight": 1,
"InitialInstanceCount": endpoint_instance_count,
"ModelName": model_2_name,
"VariantName": "AllTraffic",
}
],
)
[ ]:
existing_endpoints = sagemaker_boto_client.list_endpoints(
NameContains=endpoint_name, MaxResults=30
)["Endpoints"]
if not existing_endpoints:
create_endpoint_response = sagemaker_boto_client.create_endpoint(
EndpointName=endpoint_name, EndpointConfigName=endpoint_config_name
)
endpoint_info = sagemaker_boto_client.describe_endpoint(EndpointName=endpoint_name)
endpoint_status = endpoint_info["EndpointStatus"]
while endpoint_status == "Creating":
endpoint_info = sagemaker_boto_client.describe_endpoint(EndpointName=endpoint_name)
endpoint_status = endpoint_info["EndpointStatus"]
print("Endpoint status:", endpoint_status)
if endpoint_status == "Creating":
time.sleep(60)
## Run Predictions on Claims
Create a predictor
[ ]:
predictor = sagemaker.predictor.Predictor(
endpoint_name=endpoint_name, sagemaker_session=sagemaker_session
)
Sample a claim from the test data
[ ]:
dataset = pd.read_csv("data/dataset.csv")
train = dataset.sample(frac=0.8, random_state=0)
test = dataset.drop(train.index)
sample_policy_id = int(test.sample(1)["policy_id"])
[ ]:
test.info()
Get Multiple Claims
[ ]:
dataset = pd.read_csv("./data/claims_customer.csv")
col_order = ["fraud"] + list(dataset.drop(["fraud", "Unnamed: 0", "policy_id"], axis=1).columns)
col_order
[ ]:
col_order
Pull customer data and format the datapoint
When a customer submits an insurance claim online for instant approval, the insurance company will need to pull customer-specific data. You can do it either using the customer data we have stored in a CSV files or an online feature store to add to the claim data. The pulled data will serve as input for a model prediction.
Then, the datapoint must match the exact input format as the model was trained–with all features in the correct order. In this example, the col_order
variable was saved when you created the train and test datasets earlier in the guide.
[ ]:
sample_policy_id = int(test.sample(1)["policy_id"])
pull_from_feature_store = False
if pull_from_feature_store:
customers_response = featurestore_runtime.get_record(
FeatureGroupName=customers_fg_name, RecordIdentifierValueAsString=str(sample_policy_id)
)
customer_record = customers_response["Record"]
customer_df = pd.DataFrame(customer_record).set_index("FeatureName")
claims_response = featurestore_runtime.get_record(
FeatureGroupName=claims_fg_name, RecordIdentifierValueAsString=str(sample_policy_id)
)
claims_record = claims_response["Record"]
claims_df = pd.DataFrame(claims_record).set_index("FeatureName")
blended_df = pd.concat([claims_df, customer_df]).loc[col_order].drop("fraud")
else:
customer_claim_df = dataset[dataset["policy_id"] == sample_policy_id].sample(1)
blended_df = customer_claim_df.loc[:, col_order].drop("fraud", axis=1).T.reset_index()
blended_df.columns = ["FeatureName", "ValueAsString"]
data_input = ",".join([str(x) for x in blended_df["ValueAsString"]])
data_input
Make prediction
[ ]:
results = predictor.predict(data_input, initial_args={"ContentType": "text/csv"})
prediction = json.loads(results)
print(f"Probablitity the claim from policy {int(sample_policy_id)} is fraudulent:", prediction)
Notebook CI Test Results
This notebook was tested in multiple regions. The test results are as follows, except for us-west-2 which is shown at the top of the notebook.