Amazon SageMaker Clarify Model Monitors
This notebook’s CI test result for us-west-2 is as follows. CI test results in other regions can be found at the end of the notebook.
This notebook shows how to: * Host a machine learning model in Amazon SageMaker and capture inference requests, results, and metadata * Schedule Clarify bias monitor to monitor predictions for bias drift on a regular basis. * Schedule Clarify explainability monitor to monitor predictions for feature attribution drift on a regular basis.
Background
Amazon SageMaker Model Monitor continuously monitors the quality of Amazon SageMaker machine learning models in production. It enables developers to set alerts for when there are deviations in the model quality. Early and pro-active detection of these deviations enables corrective actions, such as retraining models, auditing upstream systems, or fixing data quality issues without having to monitor models manually or build additional tooling.
Amazon SageMaker Clarify bias monitoring helps data scientists and ML engineers monitor predictions for bias on a regular basis. One way bias can be introduced in deployed ML models is when the data used in training differs from the data used to generate predictions. This is especially pronounced if the data used for training changes over time (e.g. fluctuating mortgage rates), and the model prediction will not be accurate unless the model is retrained with updated data. For example, model for predicting home prices can be biased if the mortgage rates used to train the model differ from the most current real-world mortgage rate.
Aamazon SageMaker Clarify explainability monitoring offers tools to provide global explanations of models and to explain the predictions of a deployed model producing inferences. Such model explanation tools can help ML modelers and developers and other internal stakeholders understand model characteristics as a whole prior to deployment and to debug predictions provided by the model once deployed. The current offering includes a scalable and efficient implementation of SHAP, based on the concept of the Shapley value from the field of cooperative game theory that assigns each feature an importance value for a particular prediction.
As the model is monitored, customers can view exportable reports and graphs detailing bias and feature attributions in SageMaker Studio and configure alerts in Amazon CloudWatch to receive notifications if violations are detected.
General Setup
To get started, make sure these prerequisites completed.
Specify an AWS Region to host the model.
An IAM role ARN exists that is used to give Amazon SageMaker access to data in Amazon Simple Storage Service (Amazon S3). See the documentation for how to fine tune the permissions needed.
Create an S3 bucket used to store the test dataset, any additional model data, data captured from model invocations and ground truth data. For demonstration purposes, this notebook uses the same bucket for these. In reality, they could be separated with different security policies.
Imports
Import APIs to be used by the notebook.
[ ]:
import copy
import json
import random
import time
import pandas as pd
from datetime import datetime, timedelta
from sagemaker import get_execution_role, image_uris, Session
from sagemaker.clarify import (
BiasConfig,
DataConfig,
ModelConfig,
ModelPredictedLabelConfig,
SHAPConfig,
)
from sagemaker.model import Model
from sagemaker.model_monitor import (
BiasAnalysisConfig,
CronExpressionGenerator,
DataCaptureConfig,
EndpointInput,
ExplainabilityAnalysisConfig,
ModelBiasMonitor,
ModelExplainabilityMonitor,
)
from sagemaker.s3 import S3Downloader, S3Uploader
Handful of configuration
[ ]:
role = get_execution_role()
print(f"RoleArn: {role}")
sagemaker_session = Session()
sagemaker_client = sagemaker_session.sagemaker_client
sagemaker_runtime_client = sagemaker_session.sagemaker_runtime_client
region = sagemaker_session.boto_region_name
print(f"AWS region: {region}")
# A different bucket can be used, but make sure the role for this notebook has
# the s3:PutObject permissions. This is the bucket into which the data is captured
bucket = Session().default_bucket()
print(f"Demo Bucket: {bucket}")
prefix = "sagemaker/DEMO-ClarifyModelMonitor-20200901"
s3_key = f"s3://{bucket}/{prefix}"
print(f"S3 key: {s3_key}")
s3_capture_upload_path = f"{s3_key}/datacapture"
ground_truth_upload_path = f"{s3_key}/ground_truth_data/{datetime.now():%Y-%m-%d-%H-%M-%S}"
s3_report_path = f"{s3_key}/reports"
print(f"Capture path: {s3_capture_upload_path}")
print(f"Ground truth path: {ground_truth_upload_path}")
print(f"Report path: {s3_report_path}")
baseline_results_uri = f"{s3_key}/baselining"
print(f"Baseline results uri: {baseline_results_uri}")
endpoint_instance_count = 1
endpoint_instance_type = "ml.m5.large"
schedule_expression = CronExpressionGenerator.hourly()
Model files and data files
The prebuilt model, and a couple of dataset files.
[ ]:
model_file = "model/xgb-churn-prediction-model.tar.gz"
test_file = "test_data/upload-test-file.txt"
test_dataset = "test_data/test-dataset-input-cols.csv"
validation_dataset = "test_data/validation-dataset-with-header.csv"
dataset_type = "text/csv"
with open(validation_dataset) as f:
headers_line = f.readline().rstrip()
all_headers = headers_line.split(",")
label_header = all_headers[0]
To verify that the execution role for this notebook has the necessary permissions to proceed. Put a simple test object into the S3 bucket specified above. If this command fails, update the role to have s3:PutObject
permission on the bucket and try again.
[ ]:
# Upload a test file
S3Uploader.upload(test_file, f"s3://{bucket}/test_upload")
print("Success! We are all set to proceed.")
PART A: Capturing real-time inference data from Amazon SageMaker endpoints
Create an endpoint to showcase the data capture capability in action. (In next parts, model monitors will be created to process the data.)
Upload the pre-trained model to Amazon S3
As an example, this code uploads a pre-trained XGBoost model that is ready for to be deployed. This model was trained using the XGBoost Churn Prediction Notebook in SageMaker.
[ ]:
model_url = S3Uploader.upload(model_file, s3_key)
print(f"Model file has been uploaded to {model_url}")
Deploy the model to Amazon SageMaker
Start with deploying a pre-trained churn prediction model. Here, create the SageMaker Model
object with the image and model data.
[ ]:
model_name = f"DEMO-xgb-churn-pred-model-monitor-{datetime.utcnow():%Y-%m-%d-%H%M}"
print("Model name: ", model_name)
endpoint_name = f"DEMO-xgb-churn-model-monitor-{datetime.utcnow():%Y-%m-%d-%H%M}"
print("Endpoint name: ", endpoint_name)
To enable data capture for monitoring jobs, here specify the new capture option called DataCaptureConfig
, it enables capturing the request payload and the response payload of the endpoint. The capture config applies to all variants. Go ahead with the deployment.
[ ]:
image_uri = image_uris.retrieve("xgboost", region, "0.90-1")
print(f"XGBoost image uri: {image_uri}")
model = Model(
role=role,
name=model_name,
image_uri=image_uri,
model_data=model_url,
sagemaker_session=sagemaker_session,
)
data_capture_config = DataCaptureConfig(
enable_capture=True,
sampling_percentage=100,
destination_s3_uri=s3_capture_upload_path,
)
print(f"Deploying model {model_name} to endpoint {endpoint_name}")
model.deploy(
initial_instance_count=endpoint_instance_count,
instance_type=endpoint_instance_type,
endpoint_name=endpoint_name,
data_capture_config=data_capture_config,
)
Invoke the deployed model
Now send data to this endpoint to get inferences in real time. Because data capture is enabled in the previous steps, the request and response payload, along with some additional metadata, is saved in the Amazon S3 location specified in the DataCaptureConfig.
[ ]:
print(f"Sending test traffic to the endpoint {endpoint_name}. \nPlease wait", end="")
test_dataset_size = 0 # record the number of rows in data we're sending for inference
with open(test_dataset, "r") as f:
for row in f:
if test_dataset_size < 120:
payload = row.rstrip("\n")
response = sagemaker_runtime_client.invoke_endpoint(
EndpointName=endpoint_name,
Body=payload,
ContentType=dataset_type,
)
prediction = response["Body"].read()
print(".", end="", flush=True)
time.sleep(0.5)
test_dataset_size += 1
print()
print("Done!")
View captured data
Now list the data capture files stored in Amazon S3. There should be different files from different time periods organized based on the hour in which the invocation occurred. The format of the Amazon S3 path is:
s3://{destination-bucket-prefix}/{endpoint-name}/{variant-name}/yyyy/mm/dd/hh/filename.jsonl
[ ]:
print("Waiting 30 seconds for captures to show up", end="")
for _ in range(30):
capture_files = sorted(S3Downloader.list(f"{s3_capture_upload_path}/{endpoint_name}"))
if capture_files:
break
print(".", end="", flush=True)
time.sleep(1)
print()
print("Found Capture Files:")
print("\n ".join(capture_files[-5:]))
Next, view the content of a single capture file. Take a quick peek at the first few lines in the captured file.
[ ]:
capture_file = S3Downloader.read_file(capture_files[-1]).split("\n")[-10:-1]
print(capture_file[-1])
Finally, the contents of a single line is present below in a formatted JSON file to observe a little better.
[ ]:
print(json.dumps(json.loads(capture_file[-1]), indent=2))
Start generating some artificial traffic
The cell below starts a thread to send some traffic to the endpoint. If there is no traffic, the monitoring jobs are marked as Failed
since there is no data to process.
[ ]:
import threading
class WorkerThread(threading.Thread):
def __init__(self, do_run, *args, **kwargs):
super(WorkerThread, self).__init__(*args, **kwargs)
self.__do_run = do_run
self.__terminate_event = threading.Event()
def terminate(self):
self.__terminate_event.set()
def run(self):
while not self.__terminate_event.is_set():
self.__do_run(self.__terminate_event)
[ ]:
def invoke_endpoint(terminate_event):
with open(test_dataset, "r") as f:
i = 0
for row in f:
payload = row.rstrip("\n")
response = sagemaker_runtime_client.invoke_endpoint(
EndpointName=endpoint_name,
ContentType="text/csv",
Body=payload,
InferenceId=str(i), # unique ID per row
)
i += 1
response["Body"].read()
time.sleep(1)
if terminate_event.is_set():
break
# Keep invoking the endpoint with test data
invoke_endpoint_thread = WorkerThread(do_run=invoke_endpoint)
invoke_endpoint_thread.start()
Notice the inferenceId
attribute used above to invoke. If this is present, it will be used to join with ground truth data (otherwise eventId
will be used):
Start generating some fake ground truth
Besides captures, model bias monitoring execution also requires ground truth data. In real use cases, ground truth data should be regularly collected and uploaded to designated S3 location. In this example notebook, below code snippet is used to generate fake ground truth data. The first-party merge container will combine captures and ground truth data, and the merged data will be passed to model bias monitoring job for analysis. Similar to captures, the model bias monitoring execution will fail if there’s no data to merge.
[ ]:
import random
def ground_truth_with_id(inference_id):
random.seed(inference_id) # to get consistent results
rand = random.random()
# format required by the merge container
return {
"groundTruthData": {
"data": "1" if rand < 0.7 else "0", # randomly generate positive labels 70% of the time
"encoding": "CSV",
},
"eventMetadata": {
"eventId": str(inference_id),
},
"eventVersion": "0",
}
def upload_ground_truth(upload_time):
records = [ground_truth_with_id(i) for i in range(test_dataset_size)]
fake_records = [json.dumps(r) for r in records]
data_to_upload = "\n".join(fake_records)
target_s3_uri = f"{ground_truth_upload_path}/{upload_time:%Y/%m/%d/%H/%M%S}.jsonl"
print(f"Uploading {len(fake_records)} records to", target_s3_uri)
S3Uploader.upload_string_as_file_body(data_to_upload, target_s3_uri)
[ ]:
# Generate data for the last hour
upload_ground_truth(datetime.utcnow() - timedelta(hours=1))
[ ]:
# Generate data once a hour
def generate_fake_ground_truth(terminate_event):
upload_ground_truth(datetime.utcnow())
for _ in range(0, 60):
time.sleep(60)
if terminate_event.is_set():
break
ground_truth_thread = WorkerThread(do_run=generate_fake_ground_truth)
ground_truth_thread.start()
PART B: Model Bias Monitor
Model bias monitor can detect bias drift of Machine Learning models in a regular basis. Similar to the other monitoring types, the standard procedure of creating a model bias monitor is first baselining and then monitoring schedule.
[ ]:
model_bias_monitor = ModelBiasMonitor(
role=role,
sagemaker_session=sagemaker_session,
max_runtime_in_seconds=1800,
)
Create a baselining job
A baselining job runs predictions on training dataset and suggests constraints. suggest_baseline()
method starts a SageMakerClarifyProcessor
processing job using SageMaker Clarify container to generate the constraints.
The step is not mandatory, but providing constraints file to the monitor can enable violations file generation.
Configurations
Information about the input data need to be provided to the processor.
DataConfig
stores information about the dataset to be analyzed, for example the dataset file, its format (CSV or JSONLines), headers (if any) and label.
[ ]:
model_bias_baselining_job_result_uri = f"{baseline_results_uri}/model_bias"
model_bias_data_config = DataConfig(
s3_data_input_path=validation_dataset,
s3_output_path=model_bias_baselining_job_result_uri,
label=label_header,
headers=all_headers,
dataset_type=dataset_type,
)
BiasConfig
is the configuration of the sensitive groups in the dataset. Typically, bias is measured by computing a metric and comparing it across groups. The group of interest is specified using the “facet.” For post-training bias, the possitive label should also be taken into account.
[ ]:
model_bias_config = BiasConfig(
label_values_or_threshold=[1],
facet_name="Account Length",
facet_values_or_threshold=[100],
)
ModelPredictedLabelConfig
specifies how to extract a predicted label from the model output. This model returns probability that user will churn. Here choose an arbitrary 0.8 cutoff to consider that a customer will churn. For more complicated outputs, there are a few more options, like “label” is the index, name or JMESPath expression to locate predicted label in endpoint response payload.
[ ]:
model_predicted_label_config = ModelPredictedLabelConfig(
probability_threshold=0.8,
)
ModelConfig
is configuration related to model to be used for inferencing. In order to compute post-training bias metrics, the computation needs to get inferences for the model name provided. To accomplish this, the processing job will use the model to create an ephemeral endpoint (also known as “shadow endpoint”). The processing job will delete the shadow endpoint after the computations are completed. The configuration is also used by explainability monitor.
[ ]:
model_config = ModelConfig(
model_name=model_name,
instance_count=endpoint_instance_count,
instance_type=endpoint_instance_type,
content_type=dataset_type,
accept_type=dataset_type,
)
Kick off baselining job
[ ]:
model_bias_monitor.suggest_baseline(
model_config=model_config,
data_config=model_bias_data_config,
bias_config=model_bias_config,
model_predicted_label_config=model_predicted_label_config,
)
print(f"ModelBiasMonitor baselining job: {model_bias_monitor.latest_baselining_job_name}")
Below cell waits until the baselining job is completed and then inspects the suggested constraints. This step can be skipped, because the monitor to be scheduled will automatically pick up baselining job name and wait for it before monitoring execution.
[ ]:
model_bias_monitor.latest_baselining_job.wait(logs=False)
model_bias_constraints = model_bias_monitor.suggested_constraints()
print()
print(f"ModelBiasMonitor suggested constraints: {model_bias_constraints.file_s3_uri}")
print(S3Downloader.read_file(model_bias_constraints.file_s3_uri))
Schedule model bias monitor
With above constraints collected, now call create_monitoring_schedule()
method to schedule a hourly monitor, to analyze the data with monitoring schedule. If a baselining job has been submitted, then the monitor will automatically pick up analysis configuration from the baselining job. But if the baselining step is skipped, or the capture dataset has different nature than the training dataset, then analysis configuration has to be provided.
BiasAnalysisConfig
is a subset of the configuration of the baselining job, many options are not needed because, * Model bias monitor will merge captures and ground truth data and use merged data as dataset. ([STRIKEOUT:DataConfig]) * Captures already include predictions, so there is no need to create shadow endpoint. ([STRIKEOUT:ModelConfig]) * Attributes like probability threshold are provided as part of EndpointInput. ([STRIKEOUT:ModelPredictedLabelConfig])
[ ]:
model_bias_analysis_config = None
if not model_bias_monitor.latest_baselining_job:
model_bias_analysis_config = BiasAnalysisConfig(
model_bias_config,
headers=all_headers,
label=label_header,
)
model_bias_monitor.create_monitoring_schedule(
analysis_config=model_bias_analysis_config,
output_s3_uri=s3_report_path,
endpoint_input=EndpointInput(
endpoint_name=endpoint_name,
destination="/opt/ml/processing/input/endpoint",
start_time_offset="-PT1H",
end_time_offset="-PT0H",
probability_threshold_attribute=0.8,
),
ground_truth_input=ground_truth_upload_path,
schedule_cron_expression=schedule_expression,
)
print(f"Model bias monitoring schedule: {model_bias_monitor.monitoring_schedule_name}")
Wait for the first execution
The schedule starts jobs at the previously specified intervals. Code below wait util time crosses the hour boundary (in UTC) to see executions kick off.
Note: Even for an hourly schedule, Amazon SageMaker has a buffer period of 20 minutes to schedule executions. The execution might start in anywhere from zero to ~20 minutes from the hour boundary. This is expected and done for load balancing in the backend.
[ ]:
def wait_for_execution_to_start(model_monitor):
print(
"A hourly schedule was created above and it will kick off executions ON the hour (plus 0 - 20 min buffer)."
)
print("Waiting for the first execution to happen", end="")
schedule_desc = model_monitor.describe_schedule()
while "LastMonitoringExecutionSummary" not in schedule_desc:
schedule_desc = model_monitor.describe_schedule()
print(".", end="", flush=True)
time.sleep(60)
print()
print("Done! Execution has been created")
print("Now waiting for execution to start", end="")
while schedule_desc["LastMonitoringExecutionSummary"]["MonitoringExecutionStatus"] in "Pending":
schedule_desc = model_monitor.describe_schedule()
print(".", end="", flush=True)
time.sleep(10)
print()
print("Done! Execution has started")
[ ]:
wait_for_execution_to_start(model_bias_monitor)
In real world, a monitoring schedule is supposed to be active all the time. But in this example, it can be stopped to avoid incurring extra charges. A stopped schedule will not trigger further executions, but the ongoing execution will continue. And if needed, the schedule can be restarted by start_monitoring_schedule()
.
[ ]:
model_bias_monitor.stop_monitoring_schedule()
Wait for the execution to finish
In the previous cell, the first execution has started. This section waits for the execution to finish so that its analysis results are available. Here are the possible terminal states and what each of them mean:
Completed - This means the monitoring execution completed and no issues were found in the violations report.
CompletedWithViolations - This means the execution completed, but constraint violations were detected.
Failed - The monitoring execution failed, maybe due to client error (perhaps incorrect role permissions) or infrastructure issues. Further examination of FailureReason and ExitMessage is necessary to identify what exactly happened.
Stopped - job exceeded max runtime or was manually stopped.
[ ]:
# Waits for the schedule to have last execution in a terminal status.
def wait_for_execution_to_finish(model_monitor):
schedule_desc = model_monitor.describe_schedule()
execution_summary = schedule_desc.get("LastMonitoringExecutionSummary")
if execution_summary is not None:
print("Waiting for execution to finish", end="")
while execution_summary["MonitoringExecutionStatus"] not in [
"Completed",
"CompletedWithViolations",
"Failed",
"Stopped",
]:
print(".", end="", flush=True)
time.sleep(60)
schedule_desc = model_monitor.describe_schedule()
execution_summary = schedule_desc["LastMonitoringExecutionSummary"]
print()
print("Done! Execution has finished")
else:
print("Last execution not found")
[ ]:
wait_for_execution_to_finish(model_bias_monitor)
Inspect execution results
List the generated reports,
[ ]:
schedule_desc = model_bias_monitor.describe_schedule()
execution_summary = schedule_desc.get("LastMonitoringExecutionSummary")
if execution_summary and execution_summary["MonitoringExecutionStatus"] in [
"Completed",
"CompletedWithViolations",
]:
last_model_bias_monitor_execution = model_bias_monitor.list_executions()[-1]
last_model_bias_monitor_execution_report_uri = (
last_model_bias_monitor_execution.output.destination
)
print(f"Report URI: {last_model_bias_monitor_execution_report_uri}")
last_model_bias_monitor_execution_report_files = sorted(
S3Downloader.list(last_model_bias_monitor_execution_report_uri)
)
print("Found Report Files:")
print("\n ".join(last_model_bias_monitor_execution_report_files))
else:
last_model_bias_monitor_execution = None
print(
"====STOP==== \n No completed executions to inspect further. Please wait till an execution completes or investigate previously reported failures."
)
If there are violations compared to the baseline, they will be listed here.
[ ]:
if last_model_bias_monitor_execution:
model_bias_violations = last_model_bias_monitor_execution.constraint_violations()
if model_bias_violations:
print(model_bias_violations.body_dict)
The analysis results and CloudWatch metrics are visualized in SageMaker Studio. Select the Endpoints tab, then double click the endpoint to show the UI.
PART C: Model Explainability Monitor
Model explainability monitor can explain the predictions of a deployed model producing inferences and detect feature attribution drift on a regular basis.
[ ]:
model_explainability_monitor = ModelExplainabilityMonitor(
role=role,
sagemaker_session=sagemaker_session,
max_runtime_in_seconds=1800,
)
Create a baselining job
Similary, a baselining job can be scheduled to suggest constraints for model explainability monitor.
Configuration
In this example, the explainability baselining job shares the test dataset with the bias baselining job, so here it uses the same DataConfig
, the only difference is the job output URI.
[ ]:
model_explainability_baselining_job_result_uri = f"{baseline_results_uri}/model_explainability"
model_explainability_data_config = DataConfig(
s3_data_input_path=validation_dataset,
s3_output_path=model_explainability_baselining_job_result_uri,
label=label_header,
headers=all_headers,
dataset_type=dataset_type,
)
Currently the Clarify explainer offers a scalable and efficient implementation of SHAP, so the explainability config is SHAPConfig
, including * baseline: A list of rows (at least one) or S3 object URI to be used as the baseline dataset in the Kernel SHAP algorithm. The format should be the same as the dataset format. Each row should contain only the feature columns/values and omit the label column/values. * num_samples: Number of samples to be used in the Kernel SHAP algorithm. This number
determines the size of the generated synthetic dataset to compute the SHAP values. * agg_method: Aggregation method for global SHAP values. Valid values are * “mean_abs” (mean of absolute SHAP values for all instances), * “median” (median of SHAP values for all instances) and * “mean_sq” (mean of squared SHAP values for all instances). * use_logit: Indicator of whether the logit function is to be applied to the model predictions. Default is False. If “use_logit” is true then the SHAP values
will have log-odds units. * save_local_shap_values (bool): Indicator of whether to save the local SHAP values in the output location. Default is True.
[ ]:
# Here use the mean value of test dataset as SHAP baseline
test_dataframe = pd.read_csv(test_dataset, header=None)
shap_baseline = [list(test_dataframe.mean())]
shap_config = SHAPConfig(
baseline=shap_baseline,
num_samples=100,
agg_method="mean_abs",
save_local_shap_values=False,
)
Kick off baselining job
The same model_config is required, because the explainability baselining job needs to create shadow endpoint to get predictions for generated synthetic dataset.
[ ]:
model_explainability_monitor.suggest_baseline(
data_config=model_explainability_data_config,
model_config=model_config,
explainability_config=shap_config,
)
print(
f"ModelExplainabilityMonitor baselining job: {model_explainability_monitor.latest_baselining_job_name}"
)
Wait for baselining job to finish (or skip this cell because the monitor to be scheduled will wait for it anyway).
[ ]:
model_explainability_monitor.latest_baselining_job.wait(logs=False)
model_explainability_constraints = model_explainability_monitor.suggested_constraints()
print()
print(
f"ModelExplainabilityMonitor suggested constraints: {model_explainability_constraints.file_s3_uri}"
)
print(S3Downloader.read_file(model_explainability_constraints.file_s3_uri))
Schedule model explainability monitor
Call create_monitoring_schedule()
method to schedule a hourly monitor, to analyze the data with monitoring schedule. If a baselining job has been submitted, then the monitor will automatically pick up analysis configuration from the baselining job. But if the baselining step is skipped, or the capture dataset has different nature than the training dataset, then analysis configuration has to be provided.
ModelConfig
is required by ExplainabilityAnalysisConfig
for the same reason as it is required by the baselining job. Note that only features are required for computing feature attribution, so ground truth label should be excluded.
[ ]:
model_explainability_analysis_config = None
if not model_explainability_monitor.latest_baselining_job:
# Remove label because only features are required for the analysis
headers_without_label_header = copy.deepcopy(all_headers)
headers_without_label_header.remove(label_header)
model_explainability_analysis_config = ExplainabilityAnalysisConfig(
explainability_config=shap_config,
model_config=model_config,
headers=headers_without_label_header,
)
model_explainability_monitor.create_monitoring_schedule(
output_s3_uri=s3_report_path,
endpoint_input=endpoint_name,
schedule_cron_expression=schedule_expression,
)
Wait for execution and inspect analysis results
Once created the schedule is started by default, here wait for the its first execution to start, then stop the schedule to avoid incurring charges.
[ ]:
wait_for_execution_to_start(model_explainability_monitor)
[ ]:
model_explainability_monitor.stop_monitoring_schedule()
Wait further for the execution to finish, then inspect its analysis results,
[ ]:
wait_for_execution_to_finish(model_explainability_monitor)
[ ]:
schedule_desc = model_explainability_monitor.describe_schedule()
execution_summary = schedule_desc.get("LastMonitoringExecutionSummary")
if execution_summary and execution_summary["MonitoringExecutionStatus"] in [
"Completed",
"CompletedWithViolations",
]:
last_model_explainability_monitor_execution = model_explainability_monitor.list_executions()[-1]
last_model_explainability_monitor_execution_report_uri = (
last_model_explainability_monitor_execution.output.destination
)
print(f"Report URI: {last_model_explainability_monitor_execution_report_uri}")
last_model_explainability_monitor_execution_report_files = sorted(
S3Downloader.list(last_model_explainability_monitor_execution_report_uri)
)
print("Found Report Files:")
print("\n ".join(last_model_explainability_monitor_execution_report_files))
else:
last_model_explainability_monitor_execution = None
print(
"====STOP==== \n No completed executions to inspect further. Please wait till an execution completes or investigate previously reported failures."
)
If there are any violations compared to the baseline, they will be listed here.
[ ]:
if last_model_explainability_monitor_execution:
model_explainability_violations = (
last_model_explainability_monitor_execution.constraint_violations()
)
if model_explainability_violations:
print(model_explainability_violations.body_dict)
The analysis results and CloudWatch metrics are visualized in SageMaker Studio. Select the Endpoints tab, then double click the endpoint to show the UI.
PART D: Cleanup
The endpoint can keep running and capturing data, but if there is no plan to collect more data or use this endpoint further, it should be deleted to avoid incurring additional charges. Note that deleting endpoint does not delete the data that was captured during the model invocations.
First stop the worker threads,
[ ]:
invoke_endpoint_thread.terminate()
ground_truth_thread.terminate()
Then stop all monitors scheduled for the endpoint
[ ]:
from sagemaker.predictor import Predictor
predictor = Predictor(endpoint_name, sagemaker_session=sagemaker_session)
model_monitors = predictor.list_monitors()
for model_monitor in model_monitors:
model_monitor.stop_monitoring_schedule()
wait_for_execution_to_finish(model_monitor)
model_monitor.delete_monitoring_schedule()
Finally delete the endpoint
[ ]:
predictor.delete_endpoint()
predictor.delete_model()
Notebook CI Test Results
This notebook was tested in multiple regions. The test results are as follows, except for us-west-2 which is shown at the top of the notebook.