Deploying an Amazon Comprehend Model with SageMaker Pipelines
This notebook’s CI test result for us-west-2 is as follows. CI test results in other regions can be found at the end of the notebook.
This example notebook showcases how you can deploy a custom text classification model using Amazon Comprehend and SageMaker Pipelines.
Before you start make sure that your SageMaker Execution Role has the following policies:
ComprehendFullAccess
AmazonSageMakerFullAccess
AWSLambda_FullAccess
IAMFullAccess
Your SageMaker Execution Role should have access to S3 already. If not you can add the S3 full access policy. You will also need to add iam:passRole
as an inline policy.
Finally, you will need the following trust policies.
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": [ "sagemaker.amazonaws.com", "s3.amazonaws.com", "comprehend.amazonaws.com", "lambda.amazonaws.com" ] }, "Action": "sts:AssumeRole" } ] }Prerequisites
First, we are going to import the SageMaker SDK and set some default variables such as the role
for permissioned execution and the default_bucket
to store model artifacts.
[ ]:
%pip install s3path
%pip install s3fs
[ ]:
%load_ext autoreload
%autoreload 2
[ ]:
import boto3
import sagemaker
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.workflow.lambda_step import (
LambdaStep,
LambdaOutput,
LambdaOutputTypeEnum,
)
from sagemaker.lambda_helper import Lambda
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.workflow.properties import PropertyFile
from sagemaker.workflow.parameters import ParameterInteger, ParameterString
region = boto3.Session().region_name
sagemaker_session = sagemaker.session.Session()
role_arn = sagemaker.get_execution_role()
default_bucket = sagemaker_session.default_bucket()
Dataset
Let’s inspect the train and test dataset we will be using in this example.
[ ]:
import pandas as pd
trainFrame = pd.read_csv(
"s3://aws-ml-blog/artifacts/comprehend-custom-classification/comprehend-train.csv",
header=None,
)
trainFrame
[ ]:
testFrame = pd.read_csv(
"s3://aws-ml-blog/artifacts/comprehend-custom-classification/comprehend-test.csv",
header=None,
)
testFrame
Direct S3 to S3 copy does not work if you are in a different region. Hence, we will copy the data from source to local, then local to target.
[ ]:
!aws s3 cp s3://aws-ml-blog/artifacts/comprehend-custom-classification/comprehend-train.csv .
!aws s3 cp s3://aws-ml-blog/artifacts/comprehend-custom-classification/comprehend-test.csv .
!aws s3 cp ./comprehend-train.csv s3://$default_bucket/
!aws s3 cp ./comprehend-test.csv s3://$default_bucket/
Next, we define parameters that can be set for the execution of the pipeline. They serve as variables. We define the following:
TrainData
: Location of the training data in S3TestData
: Location of the test data in S3RoleArn
: ARN (Amazon Resource Name) of the role used for pipeline executionModelOutput
: Location of the target S3 path for the Amazon Comprehend model artifact
Amazon Comprehend creates its own validation set when training, so there is no need to provide one.
[ ]:
processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1)
input_train = ParameterString(
name="TrainData",
default_value=f"s3://{default_bucket}/comprehend-train.csv",
)
input_test = ParameterString(
name="TestData",
default_value=f"s3://{default_bucket}/comprehend-test.csv",
)
model_output = ParameterString(name="ModelOutput", default_value=f"s3://{default_bucket}/model")
We use SKLearnProcessor to run Python scripts to train, and deploy Amazon Comprehend models using boto3
. In the next chunk, we instantiate an instance of SKLearnProcessor
that we use in the next steps.
[ ]:
sklearn_processor = SKLearnProcessor(
framework_version="1.0-1",
instance_type="ml.m5.xlarge",
instance_count=processing_instance_count,
base_job_name="comprehend-process",
sagemaker_session=sagemaker_session,
role=role_arn,
)
The first Amazon SageMaker ProcessingStep provides a containerized execution environment to run the prepare_data.py
script.
[ ]:
preprocess = ProcessingStep(
name="ComprehendProcess",
processor=sklearn_processor,
inputs=[
ProcessingInput(source=input_train, destination="/opt/ml/processing/input_train"),
ProcessingInput(source=input_test, destination="/opt/ml/processing/input_test"),
],
outputs=[
ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
ProcessingOutput(output_name="test", source="/opt/ml/processing/test"),
],
code="prepare_data.py",
)
The second Amazon SageMaker processing step trains the Amazon Comprehend model by running train_eval_comprehend.py
. Amazon Comprehend automatically evaluates the performance on an evaluation set. We will use that score as a condition for deploying the model.
[ ]:
evaluation_report = PropertyFile(
name="ComprehendEvaluationReport",
output_name="evaluation",
path="evaluation.json",
)
[ ]:
comprehend_train_and_eval = ProcessingStep(
name="ComprehendTrainAndEval",
processor=sklearn_processor,
job_arguments=[
"--train-input-file",
preprocess.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
"--train-output-path",
model_output,
"--iam-role-arn",
role_arn,
],
code="train_eval_comprehend.py",
outputs=[
ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation"),
ProcessingOutput(output_name="arn", source="/opt/ml/processing/arn"),
],
property_files=[evaluation_report],
)
The third Amazon SageMaker processing step deploys the Amazon Comprehend model running deploy_comprehend.py
. If the Accuracy reported after training is lower than a certain threshold, this step does not run and the pipeline stops here.
[ ]:
step_deploy_model = ProcessingStep(
name="ComprehendDeploy",
processor=sklearn_processor,
job_arguments=[
"--arn-path",
comprehend_train_and_eval.properties.ProcessingOutputConfig.Outputs["arn"].S3Output.S3Uri,
],
code="deploy_comprehend.py",
outputs=[
ProcessingOutput(output_name="endpoint_arn", source="/opt/ml/processing/endpoint_arn")
],
)
[ ]:
comprehend_train_and_eval.properties.ProcessingOutputConfig.Outputs["arn"].S3Output.S3Uri
[ ]:
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet
cond_lte = ConditionGreaterThanOrEqualTo(
left=JsonGet(
step_name="ComprehendTrainAndEval",
property_file=evaluation_report,
json_path="Accuracy",
),
right=0.65,
)
step_cond = ConditionStep(
name="ComprehendAccuracyCondition",
conditions=[cond_lte],
if_steps=[step_deploy_model],
else_steps=[],
)
Finally, the deployed model can be used for inference. At this stage we use AWS Lambda to call the Amazon Comprehend endpoint with the text of our choice.
A role is needed to create the Lambda function. We will use a helper function in iam_helper.py
from the Lambda Step example to create the role.
[ ]:
import iam_helper
lambda_role_name = "DEMO-test-comprehend-lambda-role"
lambda_role = iam_helper.create_lambda_role(lambda_role_name)
[ ]:
example_text = (
"Italian EBU Member RAI has won the 65th Eurovision Song Contest with the song "
+ "Zitti e buoni performed by Måneskin. It's the 3rd win for Italy who last triumphed in 1990. "
+ "26 countries took part in the Grand Final of the world’s largest live music event, "
+ "hosted by Dutch EBU Members NPO, NOS and AVROTROS on Saturday 22 May in Rotterdam. "
+ "Måneskin wrote the winning song which finished the night with 524 points, 25 points "
+ "ahead of 2nd placed France represented by Barbara Pravi singing Voila. Switzerland’s Gjon’s Tears with Tout l’Univers finished in third place."
)
[ ]:
# Custom Lambda Step
function_name = "DEMO-sagemaker-lambda-step-endpoint-test"
# Lambda helper class can be used to create the Lambda function
endpoint_lambda = Lambda(
function_name=function_name,
execution_role_arn=lambda_role,
script="test_comprehend_lambda.py",
handler="test_comprehend_lambda.lambda_handler",
)
test_endpoint = LambdaStep(
name="LambdaStep",
lambda_func=endpoint_lambda,
inputs={
"endpoint_arn_path": step_deploy_model.properties.ProcessingOutputConfig.Outputs[
"endpoint_arn"
].S3Output.S3Uri,
"text": example_text,
},
)
[ ]:
step_deploy_model.properties.ProcessingOutputConfig.Outputs["endpoint_arn"].S3Output.S3Uri
[ ]:
from sagemaker.workflow.pipeline import Pipeline
pipeline_name = "DEMO-ComprehendPipeline"
pipeline = Pipeline(
name=pipeline_name,
parameters=[
processing_instance_count,
input_train,
input_test,
model_output,
],
steps=[preprocess, comprehend_train_and_eval, step_cond, test_endpoint],
)
Once the pipeline is successfully defined, we can start the execution.
[ ]:
pipeline.upsert(role_arn=role_arn)
[ ]:
execution = pipeline.start()
[ ]:
execution.wait(delay=300, max_attempts=25)
[ ]:
execution.list_steps()
Conclusion
In this notebook we have seen how to create a SageMaker Pipeline to train an Amazon Comprehend Custom Classifier on your own dataset.
Clean up
Note: the Comprehend Classifier endpoint will incur charges, but the Lambda function and IAM role will not incur costs if not deleted (if the Lambda function isn’t invoked).
[ ]:
import re
import json
from s3path import S3Path
def get_comprehend_endpoint_arn(execution):
sagemaker_client = boto3.client("sagemaker")
step_response = [
step for step in execution.list_steps() if step["StepName"] == "ComprehendDeploy"
][0]
deploy_job_arn = step_response["Metadata"]["ProcessingJob"]["Arn"]
deploy_job_name = re.split(":|/", deploy_job_arn)[-1]
job_response = sagemaker_client.describe_processing_job(ProcessingJobName=deploy_job_name)
job_path = (
job_response["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
+ "/endpoint_arn.txt"
)
with S3Path.from_uri(job_path).open() as f:
comprehend_arn = f.read()
return comprehend_arn
[ ]:
# Delete Comprehend endpoint
comprehend_client = boto3.client("comprehend")
comprehend_client.delete_endpoint(EndpointArn=get_comprehend_endpoint_arn(execution))
# Delete Lambda function
lambda_client = boto3.client("lambda")
lambda_client.delete_function(FunctionName=function_name)
# Delete IAM role
iam_helper.delete_lambda_role(lambda_role_name)
# Delete SageMaker Pipeline
pipeline.delete()
Notebook CI Test Results
This notebook was tested in multiple regions. The test results are as follows, except for us-west-2 which is shown at the top of the notebook.