Fraud Detection for Automobile Claims: Create an End to End Pipeline

Background

In this notebook, we will build a SageMaker Pipeline that automates the entire end-to-end process of preparing, training, and deploying a model that detects automobile claim fraud. For a more detailed explanation of each step of the pipeline, you can look the series of notebooks (listed below) that implements this same process using a manual approach. Please see the README.md for more information about this use case implemented by this series of notebooks.

  1. Fraud Detection for Automobile Claims: Data Exploration

  2. Fraud Detection for Automobile Claims: Data Preparation, Process, and Store Features

  3. Fraud Detection for Automobile Claims: Train, Check Bias, Tune, Record Lineage, and Register a Model

  4. Fraud Detection for Automobile Claims: Mitigate Bias, Train, Register, and Deploy Unbiased Model

Contents

  1. Prerequisites

  2. Architecture: Create a SageMaker Pipeline to Automate All the Steps from Data Prep to Model Deployment

  3. Creating an Automated Pipeline using SageMaker Pipeline

  4. Clean-Up

## Prerequisites

Install required and/or update third-party libraries

[ ]:
!python -m pip install -Uq pip
!python -m pip install -q awswrangler==2.2.0 imbalanced-learn==0.7.0 sagemaker==2.41.0 boto3==1.17.70

Import libraries

[ ]:
import json
import boto3
import pathlib
import sagemaker
import numpy as np
import pandas as pd
import awswrangler as wr
import string

import demo_helpers

from sagemaker.xgboost.estimator import XGBoost
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import CreateModelStep
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.workflow.step_collections import RegisterModel
from sagemaker.workflow.steps import ProcessingStep, TrainingStep
from sagemaker.workflow.parameters import ParameterInteger, ParameterFloat, ParameterString

Set region and boto3 config

[ ]:
# You can change this to a region of your choice
import sagemaker

region = sagemaker.Session().boto_region_name
print("Using AWS Region: {}".format(region))

boto3.setup_default_session(region_name=region)
boto_session = boto3.Session(region_name=region)

s3_client = boto3.client("s3", region_name=region)

sagemaker_boto_client = boto_session.client("sagemaker")
sagemaker_session = sagemaker.session.Session(
    boto_session=boto_session, sagemaker_client=sagemaker_boto_client
)
sagemaker_role = sagemaker.get_execution_role()

account_id = boto3.client("sts").get_caller_identity()["Account"]

bucket = sagemaker_session.default_bucket()
prefix = "fraud-detect-demo"

claims_fg_name = f"{prefix}-claims"
customers_fg_name = f"{prefix}-customers"
[ ]:
# ======> Tons of output_paths

training_job_output_path = f"s3://{bucket}/{prefix}/training_jobs"
bias_report_output_path = f"s3://{bucket}/{prefix}/clarify-bias"
explainability_output_path = f"s3://{bucket}/{prefix}/clarify-explainability"

train_data_uri = f"s3://{bucket}/{prefix}/data/train/train.csv"
test_data_uri = f"s3://{bucket}/{prefix}/data/test/test.csv"
train_data_upsampled_s3_path = f"s3://{bucket}/{prefix}/data/train/upsampled/train.csv"
processing_dir = "/opt/ml/processing"
create_dataset_script_uri = f"s3://{bucket}/{prefix}/code/create_dataset.py"
pipeline_bias_output_path = f"s3://{bucket}/{prefix}/clarify-output/pipeline/bias"
deploy_model_script_uri = f"s3://{bucket}/{prefix}/code/deploy_model.py"

# ======> variables used for parameterizing the notebook run
flow_instance_count = 1
flow_instance_type = "ml.m5.4xlarge"

train_instance_count = 1
train_instance_type = "ml.m4.xlarge"

deploy_model_instance_type = "ml.m4.xlarge"

## Architecture: Create a SageMaker Pipeline to Automate All the Steps from Data Prep to Model Deployment

End to end pipeline architecture

Creating an Automated Pipeline using SageMaker Pipeline


Now that youve manually done each step in our machine learning workflow, you can certain steps to allow for faster model experimentation without sacrificing transparncy and model tracking. In this section you will create a pipeline which trains a new model, persists the model in SageMaker and then adds the model to the registry.

Pipeline parameters

An important feature of SageMaker Pipelines is the ability to define the steps ahead of time, but be able to change the parameters to those steps at execution without having to re-define the pipeline. This can be achieved by using ParameterInteger, ParameterFloat or ParameterString to define a value upfront which can be modified when you call pipeline.start(parameters=parameters) later. Only certain parameters can be defined this way.

[ ]:
train_instance_param = ParameterString(
    name="TrainingInstance",
    default_value="ml.m4.xlarge",
)

model_approval_status = ParameterString(
    name="ModelApprovalStatus", default_value="PendingManualApproval"
)

Step 1: Claims Data Wrangler Preprocessing Step

Upload raw data to S3

Before you can preprocess the raw data with Data Wrangler, it must exist in S3.

[ ]:
s3_client.upload_file(
    Filename="data/claims.csv", Bucket=bucket, Key=f"{prefix}/data/raw/claims.csv"
)
s3_client.upload_file(
    Filename="data/customers.csv", Bucket=bucket, Key=f"{prefix}/data/raw/customers.csv"
)

Update attributes within the .flow file

Data Wrangler will generate a .flow file. It contains a reference to an S3 bucket used during the Wrangling. This may be different from the one you have as a default in this notebook eg if the Wrangling was done by someone else, you will probably not have access to their bucket and you now need to point to your own S3 bucket so you can actually load the .flow file into Data Wrangler or access the data.

After running the cell below you can open the claims.flow and customers.flow files and export the data to S3 or you can continue the guide using the provided data/claims_preprocessed.csv and data/customers_preprocessed.csv files.

[ ]:
claims_flow_template_file = "claims_flow_template"

with open(claims_flow_template_file, "r") as f:
    variables = {"bucket": bucket, "prefix": prefix}
    template = string.Template(f.read())
    claims_flow = template.substitute(variables)
    claims_flow = json.loads(claims_flow)

with open("claims.flow", "w") as f:
    json.dump(claims_flow, f)

customers_flow_template_file = "customers_flow_template"

with open(customers_flow_template_file, "r") as f:
    variables = {"bucket": bucket, "prefix": prefix}
    template = string.Template(f.read())
    customers_flow = template.substitute(variables)
    customers_flow = json.loads(customers_flow)

with open("customers.flow", "w") as f:
    json.dump(customers_flow, f)

Upload flow to S3

This will become an input to the first step and, as such, needs to be in S3.

[ ]:
s3_client.upload_file(
    Filename="claims.flow", Bucket=bucket, Key=f"{prefix}/dataprep-notebooks/claims.flow"
)
claims_flow_uri = f"s3://{bucket}/{prefix}/dataprep-notebooks/claims.flow"
print(f"Claims flow file uploaded to S3")

Define the first Data Wrangler step’s inputs

[ ]:
with open("claims.flow", "r") as f:
    claims_flow = json.load(f)

flow_step_inputs = []

# flow file contains the code for each transformation
flow_file_input = sagemaker.processing.ProcessingInput(
    source=claims_flow_uri, destination=f"{processing_dir}/flow", input_name="flow"
)

flow_step_inputs.append(flow_file_input)

# parse the flow file for S3 inputs to Data Wranger job
for node in claims_flow["nodes"]:
    if "dataset_definition" in node["parameters"]:
        data_def = node["parameters"]["dataset_definition"]
        name = data_def["name"]
        s3_input = sagemaker.processing.ProcessingInput(
            source=data_def["s3ExecutionContext"]["s3Uri"],
            destination=f"{processing_dir}/{name}",
            input_name=name,
        )
        flow_step_inputs.append(s3_input)

Define outputs for first Data Wranger step

[ ]:
claims_output_name = (
    f"{claims_flow['nodes'][-1]['node_id']}.{claims_flow['nodes'][-1]['outputs'][0]['name']}"
)

flow_step_outputs = []

flow_output = sagemaker.processing.ProcessingOutput(
    output_name=claims_output_name,
    feature_store_output=sagemaker.processing.FeatureStoreOutput(feature_group_name=claims_fg_name),
    app_managed=True,
)

flow_step_outputs.append(flow_output)

Define processor and processing step

[ ]:
# You can find the proper image uri by exporting your Data Wrangler flow to a pipeline notebook
# =================================
from sagemaker import image_uris

# Pulls the latest data-wrangler container tag, i.e. "1.x"
# The latest tested container version was "1.11.0"
image_uri = image_uris.retrieve(framework="data-wrangler", region=region)

print("image_uri: {}".format(image_uri))

flow_processor = sagemaker.processing.Processor(
    role=sagemaker_role,
    image_uri=image_uri,
    instance_count=flow_instance_count,
    instance_type=flow_instance_type,
    max_runtime_in_seconds=86400,
)

output_content_type = "CSV"

# Output configuration used as processing job container arguments
claims_output_config = {claims_output_name: {"content_type": output_content_type}}

claims_flow_step = ProcessingStep(
    name="ClaimsDataWranglerProcessingStep",
    processor=flow_processor,
    inputs=flow_step_inputs,
    outputs=flow_step_outputs,
    job_arguments=[f"--output-config '{json.dumps(claims_output_config)}'"],
)

Step 2: Customers Data Wrangler Preprocessing Step

[ ]:
s3_client.upload_file(
    Filename="customers.flow", Bucket=bucket, Key=f"{prefix}/dataprep-notebooks/customers.flow"
)
claims_flow_uri = f"s3://{bucket}/{prefix}/dataprep-notebooks/customers.flow"
print(f"Customers flow file uploaded to S3")
[ ]:
with open("customers.flow", "r") as f:
    customers_flow = json.load(f)

flow_step_inputs = []

# flow file contains the code for each transformation
flow_file_input = sagemaker.processing.ProcessingInput(
    source=claims_flow_uri, destination=f"{processing_dir}/flow", input_name="flow"
)

flow_step_inputs.append(flow_file_input)

# parse the flow file for S3 inputs to Data Wranger job
for node in customers_flow["nodes"]:
    if "dataset_definition" in node["parameters"]:
        data_def = node["parameters"]["dataset_definition"]
        name = data_def["name"]
        s3_input = sagemaker.processing.ProcessingInput(
            source=data_def["s3ExecutionContext"]["s3Uri"],
            destination=f"{processing_dir}/{name}",
            input_name=name,
        )
        flow_step_inputs.append(s3_input)
[ ]:
customers_output_name = (
    f"{customers_flow['nodes'][-1]['node_id']}.{customers_flow['nodes'][-1]['outputs'][0]['name']}"
)

flow_step_outputs = []

flow_output = sagemaker.processing.ProcessingOutput(
    output_name=customers_output_name,
    feature_store_output=sagemaker.processing.FeatureStoreOutput(
        feature_group_name=customers_fg_name
    ),
    app_managed=True,
)

flow_step_outputs.append(flow_output)

output_content_type = "CSV"

# Output configuration used as processing job container arguments
customers_output_config = {customers_output_name: {"content_type": output_content_type}}

customers_flow_step = ProcessingStep(
    name="CustomersDataWranglerProcessingStep",
    processor=flow_processor,
    inputs=flow_step_inputs,
    outputs=flow_step_outputs,
    job_arguments=[f"--output-config '{json.dumps(customers_output_config)}'"],
)

Step 3: Create Dataset and Train/Test Split

[ ]:
s3_client.upload_file(
    Filename="create_dataset.py", Bucket=bucket, Key=f"{prefix}/code/create_dataset.py"
)

create_dataset_processor = SKLearnProcessor(
    framework_version="0.23-1",
    role=sagemaker_role,
    instance_type="ml.m5.xlarge",
    instance_count=1,
    base_job_name="fraud-detection-demo-create-dataset",
    sagemaker_session=sagemaker_session,
)

create_dataset_step = ProcessingStep(
    name="CreateDataset",
    processor=create_dataset_processor,
    outputs=[
        sagemaker.processing.ProcessingOutput(
            output_name="train_data", source="/opt/ml/processing/output/train"
        ),
        sagemaker.processing.ProcessingOutput(
            output_name="test_data", source="/opt/ml/processing/output/test"
        ),
    ],
    job_arguments=[
        "--claims-feature-group-name",
        claims_fg_name,
        "--customers-feature-group-name",
        customers_fg_name,
        "--bucket-name",
        bucket,
        "--bucket-prefix",
        prefix,
        "--region",
        region,
    ],
    code=create_dataset_script_uri,
    depends_on=[claims_flow_step.name, customers_flow_step.name],
)

Step 4: Train XGBoost Model

In this step we use the ParameterString train_instance_param defined at the beginning of the pipeline.

[ ]:
hyperparameters = {
    "max_depth": "3",
    "eta": "0.2",
    "objective": "binary:logistic",
    "num_round": "100",
}

xgb_estimator = XGBoost(
    entry_point="xgboost_starter_script.py",
    output_path=training_job_output_path,
    code_location=training_job_output_path,
    hyperparameters=hyperparameters,
    role=sagemaker_role,
    instance_count=train_instance_count,
    instance_type=train_instance_param,
    framework_version="1.0-1",
)

train_step = TrainingStep(
    name="XgboostTrain",
    estimator=xgb_estimator,
    inputs={
        "train": sagemaker.inputs.TrainingInput(
            s3_data=create_dataset_step.properties.ProcessingOutputConfig.Outputs[
                "train_data"
            ].S3Output.S3Uri
        )
    },
)

Step 5: Model Pre-Deployment Step

[ ]:
model = sagemaker.model.Model(
    name="fraud-detection-demo-pipeline-xgboost",
    image_uri=train_step.properties.AlgorithmSpecification.TrainingImage,
    model_data=train_step.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=sagemaker_session,
    role=sagemaker_role,
)

inputs = sagemaker.inputs.CreateModelInput(instance_type="ml.m4.xlarge")

create_model_step = CreateModelStep(name="ModelPreDeployment", model=model, inputs=inputs)

Step 6: Run Bias Metrics with Clarify

Clarify configuration

[ ]:
bias_data_config = sagemaker.clarify.DataConfig(
    s3_data_input_path=create_dataset_step.properties.ProcessingOutputConfig.Outputs[
        "train_data"
    ].S3Output.S3Uri,
    s3_output_path=pipeline_bias_output_path,
    label="fraud",
    dataset_type="text/csv",
)

bias_config = sagemaker.clarify.BiasConfig(
    label_values_or_threshold=[0],
    facet_name="customer_gender_female",
    facet_values_or_threshold=[1],
)

analysis_config = bias_data_config.get_config()
analysis_config.update(bias_config.get_config())
analysis_config["methods"] = {"pre_training_bias": {"methods": "all"}}

clarify_config_dir = pathlib.Path("config")
clarify_config_dir.mkdir(exist_ok=True)
with open(clarify_config_dir / "analysis_config.json", "w") as f:
    json.dump(analysis_config, f)

s3_client.upload_file(
    Filename="config/analysis_config.json",
    Bucket=bucket,
    Key=f"{prefix}/clarify-config/analysis_config.json",
)

Clarify processing step

[ ]:
clarify_processor = sagemaker.processing.Processor(
    base_job_name="fraud-detection-demo-clarify-processor",
    image_uri=sagemaker.clarify.image_uris.retrieve(framework="clarify", region=region),
    role=sagemaker.get_execution_role(),
    instance_count=1,
    instance_type="ml.c5.xlarge",
)

clarify_step = ProcessingStep(
    name="ClarifyProcessor",
    processor=clarify_processor,
    inputs=[
        sagemaker.processing.ProcessingInput(
            input_name="analysis_config",
            source=f"s3://{bucket}/{prefix}/clarify-config/analysis_config.json",
            destination="/opt/ml/processing/input/config",
        ),
        sagemaker.processing.ProcessingInput(
            input_name="dataset",
            source=create_dataset_step.properties.ProcessingOutputConfig.Outputs[
                "train_data"
            ].S3Output.S3Uri,
            destination="/opt/ml/processing/input/data",
        ),
    ],
    outputs=[
        sagemaker.processing.ProcessingOutput(
            source="/opt/ml/processing/output/analysis.json",
            destination=pipeline_bias_output_path,
            output_name="analysis_result",
        )
    ],
)

Step 7: Register Model

In this step you will use the ParameterString model_approval_status defined at the outset of the pipeline code.

[ ]:
mpg_name = prefix

model_metrics = demo_helpers.ModelMetrics(
    bias=sagemaker.model_metrics.MetricsSource(
        s3_uri=clarify_step.properties.ProcessingOutputConfig.Outputs[
            "analysis_result"
        ].S3Output.S3Uri,
        content_type="application/json",
    )
)

register_step = RegisterModel(
    name="XgboostRegisterModel",
    estimator=xgb_estimator,
    model_data=train_step.properties.ModelArtifacts.S3ModelArtifacts,
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=mpg_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics,
)

Step 8: Deploy Model

[ ]:
s3_client.upload_file(
    Filename="deploy_model.py", Bucket=bucket, Key=f"{prefix}/code/deploy_model.py"
)

deploy_model_processor = SKLearnProcessor(
    framework_version="0.23-1",
    role=sagemaker_role,
    instance_type="ml.t3.medium",
    instance_count=1,
    base_job_name="fraud-detection-demo-deploy-model",
    sagemaker_session=sagemaker_session,
)

deploy_step = ProcessingStep(
    name="DeployModel",
    processor=deploy_model_processor,
    job_arguments=[
        "--model-name",
        create_model_step.properties.ModelName,
        "--region",
        region,
        "--endpoint-instance-type",
        deploy_model_instance_type,
        "--endpoint-name",
        "xgboost-model-pipeline-0120",
    ],
    code=deploy_model_script_uri,
)

Step 9: Combine and Run the Pipeline Steps

Though easier to reason with, the parameters and steps don’t need to be in order. The pipeline DAG will parse it out properly.

[ ]:
pipeline_name = f"FraudDetectDemo"
%store pipeline_name

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[train_instance_param, model_approval_status],
    steps=[
        claims_flow_step,
        customers_flow_step,
        create_dataset_step,
        train_step,
        create_model_step,
        clarify_step,
        register_step,
        deploy_step,
    ],
)

Submit the pipeline definition to the SageMaker Pipeline service

Note: If an existing pipeline has the same name it will be overwritten.

[ ]:
pipeline.upsert(role_arn=sagemaker_role)

View the entire pipeline definition

Viewing the pipeline definition will all the string variables interpolated may help debug pipeline bugs. It is commented out here due to length.

[ ]:
json.loads(pipeline.describe()["PipelineDefinition"])

Run the pipeline

Note this will take about 23 minutes to complete. You can watch the progress of the Pipeline Job on your SageMaker Studio Components panel

image.png

[ ]:
# Special pipeline parameters can be defined or changed here
parameters = {"TrainingInstance": "ml.m5.xlarge"}
[ ]:
start_response = pipeline.start(parameters=parameters)
[ ]:
start_response.wait(delay=60, max_attempts=500)
start_response.describe()

after completion it will look something like this

image.png

Pipeline

## Clean Up

After running the demo, you should remove the resources which were created. You can also delete all the objects in the project’s S3 directory by passing the keyword argument delete_s3_objects=True.

[ ]:
from demo_helpers import delete_project_resources
[ ]:
delete_project_resources(
    sagemaker_boto_client=sagemaker_boto_client,
    pipeline_name=pipeline_name,
    mpg_name=mpg_name,
    prefix=prefix,
    delete_s3_objects=False,
    bucket_name=bucket,
)