Deploying an Amazon Comprehend Model with SageMaker Pipelines

This example notebook showcases how you can deploy a custom text classification model using Amazon Comprehend and SageMaker Pipelines.

Before you start make sure that your SageMaker Execution Role has the following policies:

  • ComprehendFullAccess

  • AmazonSageMakerFullAccess

  • AWSLambda_FullAccess

  • IAMFullAccess

Your SageMaker Execution Role should have access to S3 already. If not you can add the S3 full access policy. You will also need to add iam:passRole as an inline policy.

{ "Version": "2012-10-17", "Statement": [ { "Action": [ "iam:PassRole"| ], "Effect": "Allow", "Resource": "*" } ] }

Finally, you will need the following trust policies.

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": [ "sagemaker.amazonaws.com", "s3.amazonaws.com", "comprehend.amazonaws.com", "lambda.amazonaws.com" ] }, "Action": "sts:AssumeRole" } ] }

Prerequisites

First, we are going to import the SageMaker SDK and set some default variables such as the role for permissioned execution and the default_bucket to store model artifacts.

[ ]:
%pip install s3path
%pip install s3fs
[ ]:
%load_ext autoreload
%autoreload 2
[ ]:
import boto3
import sagemaker
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.workflow.lambda_step import (
    LambdaStep,
    LambdaOutput,
    LambdaOutputTypeEnum,
)
from sagemaker.lambda_helper import Lambda
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.workflow.properties import PropertyFile
from sagemaker.workflow.parameters import ParameterInteger, ParameterString

region = boto3.Session().region_name
sagemaker_session = sagemaker.session.Session()
role_arn = sagemaker.get_execution_role()
default_bucket = sagemaker_session.default_bucket()

Dataset

Let’s inspect the train and test dataset we will be using in this example.

[ ]:
import pandas as pd

trainFrame = pd.read_csv(
    "s3://aws-ml-blog/artifacts/comprehend-custom-classification/comprehend-train.csv",
    header=None,
)
trainFrame
[ ]:
testFrame = pd.read_csv(
    "s3://aws-ml-blog/artifacts/comprehend-custom-classification/comprehend-test.csv",
    header=None,
)
testFrame

Direct S3 to S3 copy does not work if you are in a different region. Hence, we will copy the data from source to local, then local to target.

[ ]:
!aws s3 cp s3://aws-ml-blog/artifacts/comprehend-custom-classification/comprehend-train.csv .
!aws s3 cp s3://aws-ml-blog/artifacts/comprehend-custom-classification/comprehend-test.csv .

!aws s3 cp ./comprehend-train.csv s3://$default_bucket/
!aws s3 cp ./comprehend-test.csv s3://$default_bucket/

Next, we define parameters that can be set for the execution of the pipeline. They serve as variables. We define the following:

  • TrainData: Location of the training data in S3

  • TestData: Location of the test data in S3

  • RoleArn: ARN (Amazon Resource Name) of the role used for pipeline execution

  • ModelOutput: Location of the target S3 path for the Amazon Comprehend model artifact

Amazon Comprehend creates its own validation set when training, so there is no need to provide one.

[ ]:
processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1)

input_train = ParameterString(
    name="TrainData",
    default_value=f"s3://{default_bucket}/comprehend-train.csv",
)

input_test = ParameterString(
    name="TestData",
    default_value=f"s3://{default_bucket}/comprehend-test.csv",
)

model_output = ParameterString(name="ModelOutput", default_value=f"s3://{default_bucket}/model")

We use SKLearnProcessor to run Python scripts to train, and deploy Amazon Comprehend models using boto3. In the next chunk, we instantiate an instance of SKLearnProcessor that we use in the next steps.

[ ]:
sklearn_processor = SKLearnProcessor(
    framework_version="1.0-1",
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    base_job_name="comprehend-process",
    sagemaker_session=sagemaker_session,
    role=role_arn,
)

The first Amazon SageMaker ProcessingStep provides a containerized execution environment to run the prepare_data.py script.

[ ]:
preprocess = ProcessingStep(
    name="ComprehendProcess",
    processor=sklearn_processor,
    inputs=[
        ProcessingInput(source=input_train, destination="/opt/ml/processing/input_train"),
        ProcessingInput(source=input_test, destination="/opt/ml/processing/input_test"),
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test"),
    ],
    code="prepare_data.py",
)

The second Amazon SageMaker processing step trains the Amazon Comprehend model by running train_eval_comprehend.py. Amazon Comprehend automatically evaluates the performance on an evaluation set. We will use that score as a condition for deploying the model.

[ ]:
evaluation_report = PropertyFile(
    name="ComprehendEvaluationReport",
    output_name="evaluation",
    path="evaluation.json",
)
[ ]:
comprehend_train_and_eval = ProcessingStep(
    name="ComprehendTrainAndEval",
    processor=sklearn_processor,
    job_arguments=[
        "--train-input-file",
        preprocess.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
        "--train-output-path",
        model_output,
        "--iam-role-arn",
        role_arn,
    ],
    code="train_eval_comprehend.py",
    outputs=[
        ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation"),
        ProcessingOutput(output_name="arn", source="/opt/ml/processing/arn"),
    ],
    property_files=[evaluation_report],
)

The third Amazon SageMaker processing step deploys the Amazon Comprehend model running deploy_comprehend.py. If the Accuracy reported after training is lower than a certain threshold, this step does not run and the pipeline stops here.

[ ]:
step_deploy_model = ProcessingStep(
    name="ComprehendDeploy",
    processor=sklearn_processor,
    job_arguments=[
        "--arn-path",
        comprehend_train_and_eval.properties.ProcessingOutputConfig.Outputs["arn"].S3Output.S3Uri,
    ],
    code="deploy_comprehend.py",
    outputs=[
        ProcessingOutput(output_name="endpoint_arn", source="/opt/ml/processing/endpoint_arn")
    ],
)
[ ]:
comprehend_train_and_eval.properties.ProcessingOutputConfig.Outputs["arn"].S3Output.S3Uri
[ ]:
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet

cond_lte = ConditionGreaterThanOrEqualTo(
    left=JsonGet(
        step_name="ComprehendTrainAndEval",
        property_file=evaluation_report,
        json_path="Accuracy",
    ),
    right=0.65,
)

step_cond = ConditionStep(
    name="ComprehendAccuracyCondition",
    conditions=[cond_lte],
    if_steps=[step_deploy_model],
    else_steps=[],
)

Finally, the deployed model can be used for inference. At this stage we use AWS Lambda to call the Amazon Comprehend endpoint with the text of our choice.

A role is needed to create the Lambda function. We will use a helper function in iam_helper.py from the Lambda Step example to create the role.

[ ]:
import iam_helper

lambda_role_name = "DEMO-test-comprehend-lambda-role"
lambda_role = iam_helper.create_lambda_role(lambda_role_name)
[ ]:
example_text = (
    "Italian EBU Member RAI has won the 65th Eurovision Song Contest with the song "
    + "Zitti e buoni performed by Måneskin. It's the 3rd win for Italy who last triumphed in 1990. "
    + "26 countries took part in the Grand Final of the world’s largest live music event, "
    + "hosted by Dutch EBU Members NPO, NOS and AVROTROS on Saturday 22 May in Rotterdam. "
    + "Måneskin wrote the winning song which finished the night with 524 points, 25 points "
    + "ahead of 2nd placed France represented by Barbara Pravi singing Voila. Switzerland’s Gjon’s Tears with Tout l’Univers finished in third place."
)
[ ]:
# Custom Lambda Step
function_name = "DEMO-sagemaker-lambda-step-endpoint-test"

# Lambda helper class can be used to create the Lambda function
endpoint_lambda = Lambda(
    function_name=function_name,
    execution_role_arn=lambda_role,
    script="test_comprehend_lambda.py",
    handler="test_comprehend_lambda.lambda_handler",
)

test_endpoint = LambdaStep(
    name="LambdaStep",
    lambda_func=endpoint_lambda,
    inputs={
        "endpoint_arn_path": step_deploy_model.properties.ProcessingOutputConfig.Outputs[
            "endpoint_arn"
        ].S3Output.S3Uri,
        "text": example_text,
    },
)
[ ]:
step_deploy_model.properties.ProcessingOutputConfig.Outputs["endpoint_arn"].S3Output.S3Uri
[ ]:
from sagemaker.workflow.pipeline import Pipeline

pipeline_name = "DEMO-ComprehendPipeline"
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        input_train,
        input_test,
        model_output,
    ],
    steps=[preprocess, comprehend_train_and_eval, step_cond, test_endpoint],
)

Once the pipeline is successfully defined, we can start the execution.

[ ]:
pipeline.upsert(role_arn=role_arn)
[ ]:
execution = pipeline.start()
[ ]:
execution.wait(delay=300, max_attempts=25)
[ ]:
execution.list_steps()

Conclusion

In this notebook we have seen how to create a SageMaker Pipeline to train an Amazon Comprehend Custom Classifier on your own dataset.

Clean up

Note: the Comprehend Classifier endpoint will incur charges, but the Lambda function and IAM role will not incur costs if not deleted (if the Lambda function isn’t invoked).

[ ]:
import re
import json
from s3path import S3Path


def get_comprehend_endpoint_arn(execution):
    sagemaker_client = boto3.client("sagemaker")
    step_response = [
        step for step in execution.list_steps() if step["StepName"] == "ComprehendDeploy"
    ][0]
    deploy_job_arn = step_response["Metadata"]["ProcessingJob"]["Arn"]
    deploy_job_name = re.split(":|/", deploy_job_arn)[-1]
    job_response = sagemaker_client.describe_processing_job(ProcessingJobName=deploy_job_name)
    job_path = (
        job_response["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        + "/endpoint_arn.txt"
    )
    with S3Path.from_uri(job_path).open() as f:
        comprehend_arn = f.read()

    return comprehend_arn
[ ]:
# Delete Comprehend endpoint
comprehend_client = boto3.client("comprehend")
comprehend_client.delete_endpoint(EndpointArn=get_comprehend_endpoint_arn(execution))

# Delete Lambda function
lambda_client = boto3.client("lambda")
lambda_client.delete_function(FunctionName=function_name)

# Delete IAM role
iam_helper.delete_lambda_role(lambda_role_name)

# Delete SageMaker Pipeline
pipeline.delete()