Amazon SageMaker Clarify Model Explainability Monitor for Batch Transform


This notebook’s CI test result for us-west-2 is as follows. CI test results in other regions can be found at the end of the notebook.

This us-west-2 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable


Runtime

This notebook takes approximately 60 minutes to run.

Contents

Introduction

Amazon SageMaker Model Monitor continuously monitors the quality of Amazon SageMaker machine learning models in production. It enables developers to set alerts for when there are deviations in the model quality. Early and pro-active detection of these deviations enables corrective actions, such as retraining models, auditing upstream systems, or fixing data quality issues without having to monitor models manually or build additional tooling.

Amazon SageMaker Clarify Model Explainability Monitor is a model monitor that helps data scientists and ML engineers monitor predictions for feature attribution drift on a regular basis. A drift in the distribution of live data for models in production can result in a corresponding drift in the feature attribution values. As the model is monitored, customers can view exportable reports and graphs detailing feature attributions in SageMaker Studio and configure alerts in Amazon CloudWatch to receive notifications if it is detected that the attribution values drift beyond a certain threshold.

This notebook demonstrates the process for setting up a SageMaker Clarify Feature Attribution Drift Monitor for continuous monitoring of feature attribution drift of the data and model used by a regularly running SageMaker Batch Transform job. The model input and output are in CSV format.

In general, you can use the model explainability monitor for batch transform in this way,

  1. Schedule a model explainability monitor to monitor a data capture S3 location

  2. Regularly run transform jobs with data capture enabled, the jobs save captured data to the data capture S3 URI

The monitor executes processing jobs regularly to do feature attribution analysis, and then generate analysis reports and publish metrics to CloudWatch.

General Setup

The notebook uses the SageMaker Python SDK. The following cell upgrades the SDK and its dependencies. Then you may need to restart the kernel and rerun the notebook to pick up the up-to-date APIs, if the notebook is executed in the SageMaker Studio.

[ ]:
!pip install -U sagemaker
!pip install -U boto3
!pip install -U botocore

Imports

The following cell imports the APIs to be used by the notebook.

[ ]:
import sagemaker
import pandas as pd
import copy
import datetime
import io
import json
import os
import pprint
import time

Handful of configuration

To begin, ensure that these prerequisites have been completed.

  • Specify an AWS Region to host the model.

  • Specify an IAM role to execute jobs.

  • Define the S3 URIs that stores the model file, input data and output data. For demonstration purposes, this notebook uses the same bucket for them. In reality, they could be separated with different security policies.

[ ]:
sagemaker_session = sagemaker.Session()

region = sagemaker_session.boto_region_name
print(f"AWS region: {region}")

role = sagemaker.get_execution_role()
print(f"RoleArn: {role}")

# A different bucket can be used, but make sure the role for this notebook has
# the s3:PutObject permissions. This is the bucket into which the data is captured
bucket = sagemaker_session.default_bucket()
print(f"Demo Bucket: {bucket}")
prefix = sagemaker.utils.unique_name_from_base("sagemaker/DEMO-ClarifyModelMonitor")
print(f"Demo Prefix: {prefix}")
s3_key = f"s3://{bucket}/{prefix}"
print(f"Demo S3 key: {s3_key}")

data_capture_s3_uri = f"{s3_key}/data-capture"
transform_output_s3_uri = f"{s3_key}/transform-output"
baselining_output_s3_uri = f"{s3_key}/baselining-output"
monitor_output_s3_uri = f"{s3_key}/monitor-output"

print(f"The transform job will save the results to: {transform_output_s3_uri}")
print(f"The transform job will save the captured data to: {data_capture_s3_uri}")
print(f"The baselining job will save the analysis results to: {baselining_output_s3_uri}")
print(f"The monitor will save the analysis results to: {monitor_output_s3_uri}")

Data files

This example includes two dataset files, both in CSV format.

  • The train dataset has header row, and it has a target column followed by the feature columns.

  • The test dataset is not headers, and it only has feature columns.

[ ]:
train_dataset_path = "test_data/validation-dataset-with-header.csv"
test_dataset_path = "test_data/test-dataset-input-cols.csv"
dataset_type = "text/csv"
[ ]:
df = pd.read_csv(train_dataset_path)
df.head(5)
[ ]:
# Read headers
all_headers = list(df.columns)
label_header = all_headers[0]

To verify that the execution role for this notebook has the necessary permissions to proceed, put a simple test object into the S3 bucket specified above. If this command fails, update the role to have s3:PutObject permission on the bucket and try again.

[ ]:
sagemaker.s3.S3Uploader.upload_string_as_file_body(
    body="hello",
    desired_s3_uri=f"{s3_key}/upload-test-file.txt",
    sagemaker_session=sagemaker_session,
)
print("Success! We are all set to proceed with uploading to S3.")

Then upload the data files to S3 so that they can be used by SageMaker jobs.

[ ]:
train_data_s3_uri = sagemaker.s3.S3Uploader.upload(
    local_path=train_dataset_path,
    desired_s3_uri=s3_key,
    sagemaker_session=sagemaker_session,
)
print(f"Train data is uploaded to: {train_data_s3_uri}")
test_data_s3_uri = sagemaker.s3.S3Uploader.upload(
    local_path=test_dataset_path,
    desired_s3_uri=s3_key,
    sagemaker_session=sagemaker_session,
)
print(f"Test data is uploaded to: {test_data_s3_uri}")

SageMaker model

This example includes a pre-built SageMaker XGBoost model file trained by XGBoost Churn Prediction Notebook. The following cell uploads the file to S3 and then creates a SageMaker model using it. The model support CSV data format, the input are customer attributes, and the output is the probability of customer churn (a float number between zero and one).

[ ]:
model_file = "model/xgb-churn-prediction-model.tar.gz"
model_url = sagemaker.s3.S3Uploader.upload(
    local_path=model_file,
    desired_s3_uri=s3_key,
    sagemaker_session=sagemaker_session,
)
print(f"Model file has been uploaded to {model_url}")

model_name = sagemaker.utils.unique_name_from_base("DEMO-xgb-churn-pred-model-monitor")
print(f"SageMaker model name: {model_name}")

image_uri = sagemaker.image_uris.retrieve("xgboost", region, "0.90-1")
print(f"SageMaker XGBoost image: {image_uri}")

model = sagemaker.model.Model(image_uri=image_uri, model_data=model_url, role=role)
container_def = model.prepare_container_def()
sagemaker_session.create_model(model_name, role, container_def)
print("SageMaker model created")

Batch Transform Job

For continuous monitoring, batch transform jobs should be executed regularly with the latest data. But for demonstration purpose, the following cell only executes the job once before the monitor is scheduled, so that the first monitoring execution has captured data to process.

See Transformer for the API reference. The destination_s3_uri is used to specify the data capture S3 URI which is a key connection between the job and the monitor.

NOTE: The following cell takes about 5 minutes to run.

[ ]:
transfomer = model.transformer(
    instance_count=1,
    instance_type="ml.m5.xlarge",
    accept=dataset_type,  # The transform output data format
    assemble_with="Line",  # CSV records are terminated by new lines
    output_path=transform_output_s3_uri,
)

transfomer.transform(
    data=test_data_s3_uri,
    content_type=dataset_type,  # The transform input format
    split_type="Line",  # CSV records are terminated by new lines
    batch_data_capture_config=sagemaker.inputs.BatchDataCaptureConfig(
        destination_s3_uri=data_capture_s3_uri,
    ),
    wait=True,  # In real world you don't have to wait, but for demo purpose we wait for the output
    logs=False,  # You can change it to True to view job logs inline
)

Captured data

Once the transform job completed, an “input” folders is created under data_capture_s3_uri, to includes the captured data files of transform input. Note that, batch transform data capture is unlike endpoint data capture, it does not capture the data for real as it will create tremendous amount of duplications. Instead, it generates manifest files which refer to the transform output S3 location.

Now list the captured data files stored in Amazon S3. There should be different files from different time periods organized based on the hour in which the batch transformation occurred. The format of the Amazon S3 path is:

s3://{data_capture_s3_uri}/input/yyyy/mm/dd/hh/filename.jsonl

[ ]:
data_capture_output = f"{data_capture_s3_uri}/input"
captured_data_files = sorted(
    sagemaker.s3.S3Downloader.list(
        s3_uri=data_capture_output,
        sagemaker_session=sagemaker_session,
    )
)
print("Found capture data files:")
print("\n ".join(captured_data_files[-5:]))
[ ]:
captured_data_file = captured_data_files[-1]
captured_data_file_content = sagemaker.s3.S3Downloader.read_file(
    s3_uri=captured_data_files[-1],
    sagemaker_session=sagemaker_session,
)
data_capture_input_dict = json.loads(captured_data_file_content)
print(json.dumps(data_capture_input_dict, indent=4))
[ ]:
def upload_captured_data(offset):
    yyyy_mm_dd_hh = "%Y/%m/%d/%H"
    file_path, file_name = os.path.split(captured_data_file)
    this_hour_str = file_path[len(data_capture_output) + 1 :]  # like "2023/01/18/22"
    this_hour = datetime.datetime.strptime(this_hour_str, yyyy_mm_dd_hh)
    next_hour = this_hour + datetime.timedelta(hours=offset)
    next_hour_str = next_hour.strftime(yyyy_mm_dd_hh)  # like "2023/01/18/23"
    sagemaker.s3.S3Uploader.upload_string_as_file_body(
        body=captured_data_file_content,
        desired_s3_uri=f"{data_capture_output}/{next_hour_str}/{file_name}",
        sagemaker_session=sagemaker_session,
    )


# For demostration purpose, only needed for this example
# copy the captured file to the last hour's folder, just in case the first monitoring execution is started in this hour.
upload_captured_data(-1)
# copy the captured file to the next hour's folder, just in case the first monitoring execution is started after next hour.
upload_captured_data(1)

Transform input

The captured data file refers to the transform input file. The cell below shows the first few records of the file.

[ ]:
transform_input = data_capture_input_dict[0]["prefix"]
transform_input_body = sagemaker.s3.S3Downloader.read_file(
    s3_uri=transform_input,
    sagemaker_session=sagemaker_session,
)
transform_input_df = pd.read_csv(io.StringIO(transform_input_body), header=None)
transform_input_df.head()

Model Explainability Monitor

Similar to the other monitoring types, the standard procedure of creating a feature attribution drift monitor is first run a baselining job, and then schedule the monitor.

[ ]:
model_explainability_monitor = sagemaker.model_monitor.ModelExplainabilityMonitor(
    role=role,
    sagemaker_session=sagemaker_session,
    max_runtime_in_seconds=3600,
)

Baselining job

A baselining job runs predictions on training dataset and suggests constraints. The suggest_baseline() method of ModelExplainabilityMonitor starts a SageMaker Clarify processing job to generate the constraints.

The step is not mandatory, but providing constraints file to the monitor can enable violations file generation.

Configurations

Information about the input data need to be provided to the processor.

DataConfig stores information about the dataset to be analyzed. For example, the dataset file, its format (like CSV), headers and label (ground truth label is not needed for the explainability analysis, the parameter is specified so that the job knows it should be excluded from the dataset).

[ ]:
data_config = sagemaker.clarify.DataConfig(
    s3_data_input_path=train_data_s3_uri,
    s3_output_path=baselining_output_s3_uri,
    label=label_header,
    headers=all_headers,
    dataset_type=dataset_type,
)

ModelConfig is configuration related to model to be used for inferencing. In order to compute SHAP values, the SageMaker Clarify explainer generates synthetic dataset and then get its predictions for the SageMaker model. To accomplish this, the processing job will use the model to create an ephemeral endpoint (also known as “shadow endpoint”). The processing job will delete the shadow endpoint after the computations are completed.

[ ]:
model_config = sagemaker.clarify.ModelConfig(
    model_name=model_name,  # The name of the SageMaker model
    instance_count=1,  # The instance count of the shadow endpoint
    instance_type="ml.m5.xlarge",  # The instance type of the shadow endpoint
    content_type=dataset_type,  # The data format of the model input
    accept_type=dataset_type,  # The data format of the model output
)

Currently, the SageMaker Clarify explainer offers a scalable and efficient implementation of SHAP, so the explainability config is SHAPConfig, including

  • baseline: A list of records (at least one) to be used as the baseline dataset in the Kernel SHAP algorithm, each record is a list of features. It can also be a S3 object URI, the S3 file should be in the same format as dataset, and it should contain only the feature columns/values and omit the label column/values.

  • num_samples: Number of samples to be used in the Kernel SHAP algorithm. This number determines the size of the generated synthetic dataset to compute the SHAP values.

  • agg_method: Aggregation method for global SHAP values. Valid values are

    • “mean_abs” (mean of absolute SHAP values for all instances),

    • “median” (median of SHAP values for all instances) and

    • “mean_sq” (mean of squared SHAP values for all instances).

  • use_logit: Indicator of whether the logit function is to be applied to the model predictions. Default is False. If “use_logit” is true then the SHAP values will have log-odds units.

  • save_local_shap_values: Indicator of whether to save the local SHAP values in the output location. Default is True.

[ ]:
# Here use the mean value of train dataset as SHAP baseline
shap_baseline = [list(df.drop([label_header], axis=1).mean().round().astype(int))]
print(f"SHAP baseline: {shap_baseline}")

shap_config = sagemaker.clarify.SHAPConfig(
    baseline=shap_baseline,
    num_samples=100,
    agg_method="mean_abs",
    save_local_shap_values=False,
)

Kick off baselining job

Call the suggest_baseline() method to start the baselining job. The example model returns a single probability value between 0 and 1. So, the model_scores parameter is set to zero, which is the index of the probability value in the CSV model output.

[ ]:
model_explainability_monitor.suggest_baseline(
    explainability_config=shap_config,
    data_config=data_config,
    model_config=model_config,
    model_scores=0,  # The zero-based index of the probability (score) in model output
)

NOTE: The following cell waits until the baselining job is completed (in about 10 minutes). It then inspects the suggested constraints. This step can be skipped, because the monitor to be scheduled will automatically pick up baselining job name and wait for it before monitoring execution.

[ ]:
model_explainability_monitor.latest_baselining_job.wait(logs=False)
print()
model_explainability_constraints = model_explainability_monitor.suggested_constraints()
print(f"Suggested constraints: {model_explainability_constraints.file_s3_uri}")
print(
    sagemaker.s3.S3Downloader.read_file(
        s3_uri=model_explainability_constraints.file_s3_uri,
        sagemaker_session=sagemaker_session,
    )
)

Monitoring Schedule

With above constraints collected, now call create_monitoring_schedule() method to schedule an hourly model explainability monitor.

If a baselining job has been submitted, then the monitor object will automatically pick up the analysis configuration from the baselining job. But if the baselining step is skipped, or if the capture dataset has different nature than the training dataset, then analysis configuration has to be provided.

ModelConfig is required by ExplainabilityAnalysisConfig for the same reason as it is required by the baselining job. Note that only features are required for computing feature attribution, so ground truth label should be excluded.

Highlights,

  • data_capture_s3_uri is the location of data captured by the batch transform job

  • probability_attribute stores the index of the probability value in model output, similar to the model_scores parameter of the suggest_baseline() method.

[ ]:
schedule_expression = sagemaker.model_monitor.CronExpressionGenerator.hourly()
[ ]:
model_explainability_analysis_config = None
if not model_explainability_monitor.latest_baselining_job:
    # Remove label because only features are required for the analysis
    headers_without_label_header = copy.deepcopy(all_headers)
    headers_without_label_header.remove(label_header)
    model_explainability_analysis_config = sagemaker.model_monitor.ExplainabilityAnalysisConfig(
        explainability_config=shap_config,
        model_config=model_config,
        headers=headers_without_label_header,
    )
model_explainability_monitor.create_monitoring_schedule(
    analysis_config=model_explainability_analysis_config,
    batch_transform_input=sagemaker.model_monitor.BatchTransformInput(
        data_captured_destination_s3_uri=data_capture_s3_uri,
        destination="/opt/ml/processing/transform",
        dataset_format=sagemaker.model_monitor.MonitoringDatasetFormat.csv(header=False),
    ),
    output_s3_uri=monitor_output_s3_uri,
    schedule_cron_expression=schedule_expression,
)
print(
    f"Model explainability monitoring schedule: {model_explainability_monitor.monitoring_schedule_name}"
)

Wait for the first execution

The schedule starts jobs at the previously specified intervals. Code below waits until time crosses the hour boundary (in UTC) to see executions kick off.

Note: Even for an hourly schedule, Amazon SageMaker has a buffer period of 20 minutes to schedule executions. The execution might start in anywhere from zero to ~20 minutes from the hour boundary. This is expected and done for load balancing in the backend.

[ ]:
def wait_for_execution_to_start(model_monitor):
    print(
        "An hourly schedule was created above and it will kick off executions ON the hour (plus 0 - 20 min buffer)."
    )

    print("Waiting for the first execution to happen", end="")
    schedule_desc = model_monitor.describe_schedule()
    while "LastMonitoringExecutionSummary" not in schedule_desc:
        schedule_desc = model_monitor.describe_schedule()
        print(".", end="", flush=True)
        time.sleep(60)
    print()
    print("Done! Execution has been created")

    print("Now waiting for execution to start", end="")
    while schedule_desc["LastMonitoringExecutionSummary"]["MonitoringExecutionStatus"] in "Pending":
        schedule_desc = model_monitor.describe_schedule()
        print(".", end="", flush=True)
        time.sleep(10)

    print()
    print("Done! Execution has started")

NOTE: The following cell waits until the first monitoring execution is started. As explained above, the wait could take more than 60 minutes.

[ ]:
wait_for_execution_to_start(model_explainability_monitor)

In real world, a monitoring schedule is supposed to be active all the time. But in this example, it can be stopped to avoid incurring extra charges. A stopped schedule will not trigger further executions, but the ongoing execution will continue. And if needed, the schedule can be restarted by start_monitoring_schedule().

[ ]:
model_explainability_monitor.stop_monitoring_schedule()

Wait for the execution to finish

In the previous cell, the first execution has started. This section waits for the execution to finish so that its analysis results are available. Here are the possible terminal states and what each of them mean:

  • Completed - This means the monitoring execution completed, and no issues were found in the violations report.

  • CompletedWithViolations - This means the execution completed, but constraint violations were detected.

  • Failed - The monitoring execution failed, maybe due to client error (perhaps incorrect role permissions) or infrastructure issues. Further examination of FailureReason and ExitMessage is necessary to identify what exactly happened.

  • Stopped - job exceeded max runtime or was manually stopped.

[ ]:
# Waits for the schedule to have last execution in a terminal status.
def wait_for_execution_to_finish(model_monitor):
    schedule_desc = model_monitor.describe_schedule()
    execution_summary = schedule_desc.get("LastMonitoringExecutionSummary")
    if execution_summary is not None:
        print("Waiting for execution to finish", end="")
        while execution_summary["MonitoringExecutionStatus"] not in [
            "Completed",
            "CompletedWithViolations",
            "Failed",
            "Stopped",
        ]:
            print(".", end="", flush=True)
            time.sleep(60)
            schedule_desc = model_monitor.describe_schedule()
            execution_summary = schedule_desc["LastMonitoringExecutionSummary"]
        print()
        print(f"Done! Execution Status: {execution_summary['MonitoringExecutionStatus']}")
    else:
        print("Last execution not found")

NOTE: The following cell takes about 10 minutes.

[ ]:
wait_for_execution_to_finish(model_explainability_monitor)

Inspect execution results

List the generated reports,

  • analysis.json includes the global SHAP values.

  • report.* files are static report files to visualize the SHAP values.

[ ]:
schedule_desc = model_explainability_monitor.describe_schedule()
execution_summary = schedule_desc.get("LastMonitoringExecutionSummary")
if execution_summary and execution_summary["MonitoringExecutionStatus"] in [
    "Completed",
    "CompletedWithViolations",
]:
    last_model_explainability_monitor_execution = model_explainability_monitor.list_executions()[-1]
    last_model_explainability_monitor_execution_report_uri = (
        last_model_explainability_monitor_execution.output.destination
    )
    print(f"Report URI: {last_model_explainability_monitor_execution_report_uri}")
    last_model_explainability_monitor_execution_report_files = sorted(
        sagemaker.s3.S3Downloader.list(
            s3_uri=last_model_explainability_monitor_execution_report_uri,
            sagemaker_session=sagemaker_session,
        )
    )
    print("Found Report Files:")
    print("\n ".join(last_model_explainability_monitor_execution_report_files))
else:
    last_model_explainability_monitor_execution = None
    print(
        "====STOP==== \n No completed executions to inspect further. Please wait till an execution completes or investigate previously reported failures."
    )

If there are any violations compared to the baseline, they are listed here. See Feature Attribution Drift Violations for the schema of the file, and how violations are detected.

[ ]:
violations = model_explainability_monitor.latest_monitoring_constraint_violations()
if violations is not None:
    pprint.PrettyPrinter(indent=4).pprint(violations.body_dict)

By default, the analysis results are also published to CloudWatch, see CloudWatch Metrics for Feature Attribution Drift Analysis.

Cleanup

If there is no plan to collect more data for feature attribution drift monitoring, then the monitor should be stopped (and deleted) to avoid incurring additional charges. Note that deleting the monitor does not delete the data in S3.

[ ]:
model_explainability_monitor.stop_monitoring_schedule()
wait_for_execution_to_finish(model_explainability_monitor)
model_explainability_monitor.delete_monitoring_schedule()
sagemaker_session.delete_model(model_name)

Notebook CI Test Results

This notebook was tested in multiple regions. The test results are as follows, except for us-west-2 which is shown at the top of the notebook.

This us-east-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This us-east-2 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This us-west-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This ca-central-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This sa-east-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This eu-west-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This eu-west-2 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This eu-west-3 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This eu-central-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This eu-north-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This ap-southeast-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This ap-southeast-2 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This ap-northeast-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This ap-northeast-2 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This ap-south-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable