Chaining using Ground Truth Streaming Labeling Jobs

You can use a streaming labeling job to perpetually send new data objects to Amazon SageMaker Ground Truth to be labeled. Ground Truth streaming labeling jobs remain active until they are manually stopped or have been idle for more than 10 days. You can intermittently send new data objects to workers while the labeling job is active.

Use this notebook to create a Ground Truth streaming labeling job using any of the built-in task types. You can make necessary parameter changes for the custom workflow. You can either configure the notebook to create a labeling job using your own input data, or run the notebook on default mode and use provided, image input data. To use your own input data, set ``DEFAULT`` to ``False``.

Chaining is a powerful feature that you can use to send the output of one streaming labeling job to another streaming labeling job. This opens up multiple possibilities to setup jobs so that data of Job 1 and can flow to Job2, Data of Job 2 can flow to Job n-1, data of Job n-1 can flow to Job n in real time.

In this notebook, we show how you can use 2 such streaming jobs in a chained fashion. If you select DEFAULT to True, we setup Job 1 which is an “Object Detection” job where one can draw bounding boxes around objects and Job 2 to be an “Object Detection Adjustment” job where one can adjust the previously drawn bounding boxes from Job 1.

[ ]:
DEFAULT = True

To read more about streaming labeling jobs, see the Amazon SageMaker documentation on Ground Truth Streaming Labeling Jobs.

To learn more about each step in this notebook, refer to Create a Streaming Labeling Job.

Get latest version of AWS python SDK

[ ]:
!pip install -q --upgrade pip
!pip install awscli -q --upgrade
!pip install botocore -q --upgrade
!pip install boto3 -q --upgrade
!pip install sagemaker -q --upgrade

# NOTE: Restart Kernel after the above command
[254]:
import boto3
import botocore
import json
import time
import sagemaker
import re
import os

Prerequisites

You will create some of the resources you need to launch a Ground Truth streaming labeling job in this notebook. You must create the following resources before executing this notebook:

  • A work team. A work team is a group of workers that complete labeling tasks. If you want to preview the worker UI and execute the labeling task you will need to create a private work team, add yourself as a worker to this team, and provide the work team ARN below. If you do not want to use a private or vendor work team ARN, set private_work_team to False to use the Amazon Mechanical Turk workforce. To learn more about private, vendor, and Amazon Mechanical Turk workforces, see Create and Manage Workforces.

    • IMPORTANT: 3D point cloud and video frame labeling jobs only support private and vendor workforces. If you plan to use 3D point cloud or video frame input data, specify a private or vendor workforce below for WORKTEAM_ARN.

  • If you use this notebook to create an image, video frame, or 3D point cloud labeling job, the S3 bucket that you use for this demo must have a CORS policy attached. To learn more about this requirement, and how to attach a CORS policy to an S3 bucket, see CORS Permission Requirement.

[255]:
private_work_team = True  # Set it to false if using Amazon Mechanical Turk Workforce

if private_work_team:
    WORKTEAM_ARN = "<<ADD WORK TEAM ARN HERE>>"
else:
    region = boto3.session.Session().region_name
    WORKTEAM_ARN = f"arn:aws:sagemaker:{region}:394669845002:workteam/public-crowd/default"
print(f"This notebook will use the work team ARN: {WORKTEAM_ARN}")
This notebook will use the work team ARN: <<ADD WORK TEAM ARN HERE>>
[ ]:
# Make sure workteam arn is populated if private work team is chosen
assert WORKTEAM_ARN != "<<ADD WORK TEAM ARN HERE>>"
  • The IAM execution role you used to create this notebook instance must have the following permissions:

    • AWS managed policy AmazonSageMakerGroundTruthExecution. Run the following code-block to see your IAM execution role name. This GIF demonstrates how to add this policy to an IAM role in the IAM console. You can also find instructions in the IAM User Guide: Adding and removing IAM identity permissions.

    • When you create your role, you specify Amazon S3 permissions. Make sure that your IAM role has access to the S3 bucket that you plan to use in this example. If you do not specify an S3 bucket in this notebook, the default bucket in the AWS region you are running this notebook instance will be used. If you do not require granular permissions, you can attach AmazonS3FullAccess to your role.

[ ]:
role = sagemaker.get_execution_role()
role_name = role.split("/")[-1]
print(
    "IMPORTANT: Make sure this execution role has the AWS Managed policy AmazonGroundTruthExecution attached."
)
print("********************************************************************************")
print("The IAM execution role name:", role_name)
print("The IAM execution role ARN:", role)
print("********************************************************************************")
[ ]:
# Make sure the bucket is in the same region as this notebook.
BUCKET = "<< YOUR S3 BUCKET NAME >>"

sess = sagemaker.Session()
s3 = boto3.client("s3")

if BUCKET == "<< YOUR S3 BUCKET NAME >>":
    BUCKET = sess.default_bucket()
region = boto3.session.Session().region_name
bucket_region = s3.head_bucket(Bucket=BUCKET)["ResponseMetadata"]["HTTPHeaders"][
    "x-amz-bucket-region"
]
assert (
    bucket_region == region
), f"Your S3 bucket {BUCKET} and this notebook need to be in the same region."
print(f"IMPORTANT: make sure the role {role_name} has the access to read and write to this bucket.")
print(
    "********************************************************************************************************"
)
print(f"This notebook will use the following S3 bucket: {BUCKET}")
print(
    "********************************************************************************************************"
)

SNS topics for Input and Output

You can send data objects to your streaming labeling job using Amazon Simple Notification Service (Amazon SNS). Amazon SNS is a web service that coordinates and manages the delivery of messages to and from endpoints (for example, an email address or AWS Lambda function). An Amazon SNS topic acts as a communication channel between two or more endpoints. You use Amazon SNS to send, or publish, new data objects to the topic specified in the CreateLabelingJob parameter SnsTopicArn in InputConfig.

The following cells will create a name for your labeling job and use this name to create Amazon SNS input and output topics. This labeling job name and these topics will be used in your CreateLabelingJob request later in this notebook.

[ ]:
# Job Name
LABELING_JOB_NAME = "GroundTruth-streaming-" + str(int(time.time()))

print("Your labeling job name will be :", LABELING_JOB_NAME)
[ ]:
# Make sure role has "Sns:CreateTopic" access
sns = boto3.client("sns")

# Create Input Topic
input_response = sns.create_topic(Name=LABELING_JOB_NAME + "-Input")
INPUT_SNS_TOPIC = input_response["TopicArn"]
print("input_sns_topic :", INPUT_SNS_TOPIC)

# Create Output Topic
output_response = sns.create_topic(Name=LABELING_JOB_NAME + "-Output")
OUTPUT_SNS_TOPIC = output_response["TopicArn"]
print("output_sns_topic :", OUTPUT_SNS_TOPIC)

Choose Labeling Job Type

Ground Truth supports a variety of built-in task types which streamline the process of creating image, text, video, video frame, and 3D point cloud labeling jobs. You can use this notebook on default mode if you do not want to bring your own input data and input manifest file.

If you have input data and an input manifest file in an S3 bucket, set DEFAULT to False and choose the Labeling Job Task Type you want to use below and specify the S3 URI of your input manifest file below. The S3 URI looks similar to s3://your-bucket/path-to-input-manifest/input-manifest.manifest. To learn more about each task type, see Built-in Task Types.

Choose Labeling Job Built-In Task Type

Copy one of the following task types and use it to set the value for task_type. If you set ``DEFAULT`` to True, at the beginning of this notebook, the image bounding box task type will be used by default.

To create a custom labeling workflow, set CUSTOM to True and specify your custom lambda functions to pre-process your input data and process output data in the section Create Custom Labeling Workflow below.

[ ]:
## Choose from following:
## Bounding Box
## Image Classification (Single Label)
## Image Classification (Multi-label)
## Image Semantic Segmentation
## Text Classification (Single Label)
## Text Classification (Multi-label)
## Named Entity Recognition
## Video Classification
## Video Frame Object Detection
## Video Frame Object Tracking
## 3D Point Cloud Object Detection
## 3D Point Cloud Object Detection
## 3D Point Cloud Semantic Segmentation

task_type = "<<COPY AND PASTE TASK TYPE FROM LIST ABOVE>>"
if DEFAULT:
    task_type = "Bounding Box"
print(f"Your task type: {task_type}")
[ ]:
task_type_map = {
    "Bounding Box": "BoundingBox",
    "Image Classification (Single Label)": "ImageMultiClass",
    "Image Classification (Multi-label)": "ImageMultiClassMultiLabel",
    "Image Semantic Segmentation": "SemanticSegmentation",
    "Text Classification (Single Label)": "TextMultiClass",
    "Text Classification (Multi-label)": "TextMultiClassMultiLabel",
    "Named Entity Recognition": "NamedEntityRecognition",
    "Video Classification": "VideoMultiClass",
    "Video Frame Object Detection": "VideoObjectDetection",
    "Video Frame Object Tracking": "VideoObjectTracking",
    "3D Point Cloud Object Detection": "3DPointCloudObjectDetection",
    "3D Point Cloud Object Tracking": "3DPointCloudObjectTracking",
    "3D Point Cloud Semantic Segmentation": "3DPointCloudSemanticSegmentation",
}


arn_region_map = {
    "us-west-2": "081040173940",
    "us-east-1": "432418664414",
    "us-east-2": "266458841044",
    "eu-west-1": "568282634449",
    "eu-west-2": "487402164563",
    "ap-northeast-1": "477331159723",
    "ap-northeast-2": "845288260483",
    "ca-central-1": "918755190332",
    "eu-central-1": "203001061592",
    "ap-south-1": "565803892007",
    "ap-southeast-1": "377565633583",
    "ap-southeast-2": "454466003867",
}
[252]:
task_type_suffix = task_type_map[task_type]
region_account = arn_region_map[region]
PRE_HUMAN_TASK_LAMBDA = f"arn:aws:lambda:{region}:{region_account}:function:PRE-{task_type_suffix}"
POST_ANNOTATION_LAMBDA = f"arn:aws:lambda:{region}:{region_account}:function:ACS-{task_type_suffix}"
print(PRE_HUMAN_TASK_LAMBDA)
print(POST_ANNOTATION_LAMBDA)
arn:aws:lambda:us-west-2:081040173940:function:PRE-BoundingBox
arn:aws:lambda:us-west-2:081040173940:function:ACS-BoundingBox

3D point cloud and video frame task types have special requirements. The following variables will be used to configure your labeling job for these task types. To learn more, see the following topics in the documentation: * 3D Point Cloud Labeling Jobs Overview * Video Frame Labeling Job Overview

[ ]:
point_cloud_task = re.search(r"Point Cloud", task_type) is not None
video_frame_task = re.search(r"Video Frame", task_type) is not None

Create Custom Labeling Workflow

If you want to create a custom labeling workflow, you can create your own lambda functions to pre-process your input data and post-process the labels returned from workers. To learn more, see Step 3: Processing with AWS Lambda.

To use this notebook to run a custom flow, set CUSTOM to True and specify your pre- and post-processing lambdas below.

[ ]:
CUSTOM = False
if CUSTOM:
    PRE_HUMAN_TASK_LAMBDA = "<ADD-PRE-PROCESSING-LABMDA-ARN>"
    POST_ANNOTATION_LAMBDA = "<ADD-POST-PROCESSING-LABMDA-ARN>"

Specify Labels

You specify the labels that you want workers to use to annotate your data in a label category configuration file. When you create a 3D point cloud or video frame labeling job, you can add label category attributes to your labeling category configruation file. Workers can assign one or more attributes to annotations to give more information about that object.

For all task types, you can use the following cell to identify the labels you use for your labeling job. To create a label category configuration file with label category attributes, see Create a Labeling Category Configuration File with Label Category Attributes in the Amazon SageMaker developer guide.

[ ]:
# Add label categories of your choice
LABEL_CATEGORIES = []

if DEFAULT:
    LABEL_CATEGORIES = ["Pedestrian", "Street Car", "Biker"]

The following cell will create a label category configuration file using the labels specified above.

IMPORTANT: Make sure you have added label categories above and they appear under labels when you run the following cell.

[ ]:
# Specify labels and this notebook will upload and a label category configuration file to S3.
json_body = {
    "document-version": "2018-11-28",
    "labels": [{"label": label} for label in LABEL_CATEGORIES],
}
with open("class_labels.json", "w") as f:
    json.dump(json_body, f)

print("Your label category configuration file:")
print("\n", json.dumps(json_body, indent=2))
[ ]:
s3.upload_file("class_labels.json", BUCKET, "class_labels.json")
[ ]:
LABEL_CATEGORIES_S3_URI = f"s3://{BUCKET}/class_labels.json"
print(f"You should now see class_labels.json in {LABEL_CATEGORIES_S3_URI}")

Create A Worker Task Template

Part or all of your images will be annotated by human annotators. It is essential to provide good instructions. Good instructions are:

  1. Concise. We recommend limiting verbal/textual instruction to two sentences and focusing on clear visuals.

  2. Visual. In the case of object detection, we recommend providing several labeled examples with different numbers of boxes.

  3. When used through the AWS Console, Ground Truth helps you create the instructions using a visual wizard. When using the API, you need to create an HTML template for your instructions.

NOTE: If you use any images in your template (as we do), they need to be publicly accessible. You can enable public access to files in your S3 bucket through the S3 Console, as described in S3 Documentation.

Specify Resources Used for Human Task UI

The human task user interface (UI) is the interface that human workers use to label your data. Depending on the type of labeling job you create, you will specify a resource that is used to generate the human task UI in the UiConfig parameter of CreateLabelingJob.

For 3D point cloud and video frame labeling tasks, you will specify a pre-defined HumanTaskUiARN. For all other labeling job task types, you will specify a UiTemplateS3Uri.

Bounding Box Image Labeling Job (Default)

If you set DEFAULT to True, use the following to create a worker task template and upload it to your S3 bucket. Ground Trust uses this template to generate your human task UI.

[ ]:
from IPython.core.display import HTML, display


def make_template(save_fname="instructions.template"):
    template = r"""<script src="https://assets.crowd.aws/crowd-html-elements.js"></script>
    <crowd-form>
      <crowd-bounding-box
        name="boundingBox"
        src="{{{{ task.input.taskObject | grant_read_access }}}}"
        header="Dear Annotator, please draw a tight box around each object you see (if there are more than 8 objects, draw boxes around at least 8)."
        labels="{{{{ task.input.labels | to_json | escape }}}}"
      >
        <full-instructions header="Please annotate each object">

    <ol>
        <li><strong>Inspect</strong> the image</li>
        <li><strong>Determine</strong> if the specified label is/are visible in the picture.</li>
        <li><strong>Outline</strong> each instance of the specified label in the image using the provided “Box” tool.</li>
    </ol>


        </full-instructions>
        <short-instructions>
         <ul>
        <li>Boxes should fit tightly around each object</li>
        <li>Do not include parts of the object are overlapping or that cannot be seen, even though you think you can interpolate the whole shape.</li>
        <li>Avoid including shadows.</li>
        <li>If the target is off screen, draw the box up to the edge of the image.</li>
        </ul>
        </short-instructions>
      </crowd-bounding-box>
    </crowd-form>

    """.format()
    with open(save_fname, "w") as f:
        f.write(template)


if DEFAULT:
    make_template(save_fname="instructions.template")
[ ]:
if DEFAULT:
    result = s3.upload_file("instructions.template", BUCKET, "instructions.template")

Image, Text, and Custom Labeling Jobs (Non Default)

For all image and text based built-in task types, you can find a sample worker task template on that task type page. Find the page for your task type on Built-in Task Types. You will see an example template under the section Create a {Insert-Task-Type} Job (API).

Update <full-instructions></full-instructions> and <short-instructions></short-instructions>. Add your template to the following code block and run the code blocks below to generate your worker task template and upload it to your S3 bucket.

For custom labeling workflows, you can provide a custom HTML worker task template using Crowd HTML Elements. To learn more, see Step 2: Creating your custom labeling task template.

Ground Trust uses this template to generate your human task UI.

Important: If you use the following make_template function to create and upload a worker task template to Amazon S3, you must add an extra pair of {} brackets around each Liquid element. For example, if the template contains {{ task.input.labels | to_json | escape }}, this line should look as follows in the make_template variable template: {{{{ task.input.labels | to_json | escape }}}}.

[ ]:
from IPython.core.display import HTML, display


def make_template(save_fname="instructions.template"):
    template = r"""
    <<<ADD-TEMPLATE-HTML-CODE-HERE>>>
    """.format()
    with open(save_fname, "w") as f:
        f.write(template)


# This will upload your template to S3 if you are not running on DEFAULT mode, and if your take type
# does not use video frames or 3D point clouds.
if not DEFAULT and not video_frame_task and not point_cloud_task:
    make_template(save_fname="instructions.html")
    s3.upload_file("instructions.template", BUCKET, "instructions.template")

3D Point Cloud and Video Frame Task Types

If you are creating a 3D point cloud or video frame task type, your worker UI is configured by Ground Truth. If you chose one of these task types above, the following cell will specify the correct template.

[ ]:
import re

if not DEFAULT:
    if point_cloud_task:
        task_type_suffix_humanuiarn = task_type_suffix.split("3D")[-1]
        HUMAN_UI_ARN = (
            f"arn:aws:sagemaker:{region}:394669845002:human-task-ui/{task_type_suffix_humanuiarn}"
        )
    if video_frame_task:
        HUMAN_UI_ARN = f"arn:aws:sagemaker:{region}:394669845002:human-task-ui/{task_type_suffix}"
    print(f"The Human Task UI ARN is: {HUMAN_UI_ARN}")

(Optional) Create an Input Manifest File

You can optionally specify an input manifest file Amazon S3 URI in ManifestS3Uri when you create the streaming labeling job. Ground Truth sends each data object in the manifest file to workers for labeling as soon as the labeling job starts.

Each line in an input manifest file is an entry containing an object, or a reference to an object, to label. An entry can also contain labels from previous jobs and for some task types, additional information.

To learn how to create an input manifest file, see Use an Input Manifest File. Copy the S3 URI of the file below.

[ ]:
# [Optional] The path in Amazon S3 to your input manifest file.
INPUT_MANIFEST = ""

Specify Parameters for Labeling Job

If you set DEFAULT to False, you must specify the following parameters. These will be used to configure and create your lableing job. If you set DEFAULT to True, default parameters will be used.

To learn more about these parameters, use the following documentation: * TaskTitle * TaskDescription * TaskKeywords

[ ]:
TASK_TITLE = "<<ADD-TASK-TITLE>>"

if DEFAULT:
    TASK_TITLE = "Add bounding boxes to detect objects in an image"

TASK_DESCRIPTION = "<<ADD-TASK-DESCRIPTION>>"
if DEFAULT:
    TASK_DESCRIPTION = "Categorize images into classes using bounding boxes"

# Keywords for your task, in a string-array. ex) ['image classification', 'image dataset']
TASK_KEYWORDS = ["<<ADD-KEYWODS>>"]
if DEFAULT:
    TASK_KEYWORDS = ["bounding box", "image dataset"]
[ ]:
# The path in Amazon S3 to your worker task template or human task UI
HUMAN_UI = []
if point_cloud_task or video_frame_task:
    HUMAN_TASK_UI_ARN = HUMAN_UI_ARN
    HUMAN_UI.append(HUMAN_TASK_UI_ARN)
    UI_CONFIG_PARAM = "HumanTaskUiArn"
else:
    UI_TEMPLATE_S3_URI = f"s3://{BUCKET}/instructions.template"
    HUMAN_UI.append(UI_TEMPLATE_S3_URI)
    UI_CONFIG_PARAM = "UiTemplateS3Uri"

print(f"{UI_CONFIG_PARAM} resource that will be used: {HUMAN_UI[0]}")
[ ]:
# The ARN for your SNS input topic.
INPUT_TOPIC_ARN = INPUT_SNS_TOPIC

# The ARN for your SNS output topic.
OUTPUT_TOPIC_ARN = OUTPUT_SNS_TOPIC

# If you want to store your output manifest in a different folder, provide an OUTPUT_PATH.
OUTPUT_FOLDER_PREFIX = "/gt-streaming-demo-output"
OUTPUT_BUCKET = "s3://" + BUCKET + OUTPUT_FOLDER_PREFIX
print("Your output data will be stored in:", OUTPUT_BUCKET)

# An IAM role with AmazonGroundTruthExecution policies attached.
# This must be the same role that you used to create this notebook instance.
ROLE_ARN = role

Use the CreateLabelingJob API to create a streaming labeling job [Job 1]

[ ]:
if (
    re.search(r"Semantic Segmentation", task_type) is not None
    or re.match(r"Object Tracking", task_type) is not None
    or video_frame_task
):
    LABEL_ATTRIBUTE_NAME = LABELING_JOB_NAME + "-ref"
else:
    LABEL_ATTRIBUTE_NAME = LABELING_JOB_NAME

human_task_config = {
    "PreHumanTaskLambdaArn": PRE_HUMAN_TASK_LAMBDA,
    "MaxConcurrentTaskCount": 100,  # Maximum of 100 objects will be available to the workteam at any time
    "NumberOfHumanWorkersPerDataObject": 1,  # We will obtain and consolidate 1 human annotationsfor each image.
    "TaskAvailabilityLifetimeInSeconds": 21600,  # Your workteam has 6 hours to complete all pending tasks.
    "TaskDescription": TASK_DESCRIPTION,
    # If using public workforce, specify "PublicWorkforceTaskPrice"
    "WorkteamArn": WORKTEAM_ARN,
    "AnnotationConsolidationConfig": {"AnnotationConsolidationLambdaArn": POST_ANNOTATION_LAMBDA},
    "TaskKeywords": TASK_KEYWORDS,
    "TaskTimeLimitInSeconds": 600,  # Each image must be labeled within 10 minutes.
    "TaskTitle": TASK_TITLE,
    "UiConfig": {UI_CONFIG_PARAM: HUMAN_UI[0]},
}

# if you are using the Amazon Mechanical Turk workforce, specify the amount you want to pay a
# worker to label a data object. See https://aws.amazon.com/sagemaker/groundtruth/pricing/ for recommendations.
if not private_work_team:
    human_task_config["PublicWorkforceTaskPrice"] = {
        "AmountInUsd": {
            "Dollars": 0,
            "Cents": 3,
            "TenthFractionsOfACent": 6,
        }
    }
    human_task_config["WorkteamArn"] = WORKTEAM_ARN
else:
    human_task_config["WorkteamArn"] = WORKTEAM_ARN

ground_truth_request = {
    "InputConfig": {"DataSource": {"SnsDataSource": {"SnsTopicArn": INPUT_TOPIC_ARN}}},
    "HumanTaskConfig": human_task_config,
    "LabelAttributeName": LABEL_ATTRIBUTE_NAME,
    "LabelCategoryConfigS3Uri": LABEL_CATEGORIES_S3_URI,
    "LabelingJobName": LABELING_JOB_NAME,
    "OutputConfig": {"S3OutputPath": OUTPUT_BUCKET, "SnsTopicArn": OUTPUT_TOPIC_ARN},
    "RoleArn": ROLE_ARN,
}

if INPUT_MANIFEST is not "":
    ground_truth_request["InputConfig"]["DataSource"]["S3DataSource"] = {
        "ManifestS3Uri": INPUT_MANIFEST
    }

You should not share explicit, confidential, or personal information or protected health information with the Amazon Mechanical Turk workforce.

If you are using Amazon Mechanical Turk workforce, you must verify that your data is free of personal, confidential, and explicit content and protected health information using this code cell.

[ ]:
if not private_work_team:
    ground_truth_request["InputConfig"]["DataAttributes"] = {
        "ContentClassifiers": ["FreeOfPersonallyIdentifiableInformation", "FreeOfAdultContent"]
    }
[ ]:
print("Your create labeling job request:\n", json.dumps(ground_truth_request, indent=4))
[ ]:
sagemaker_client = boto3.client("sagemaker")
sagemaker_client.create_labeling_job(**ground_truth_request)

Use the DescribeLabelingJob API to describe a streaming labeling job

[ ]:
sagemaker_client.describe_labeling_job(LabelingJobName=LABELING_JOB_NAME)

Wait until the labeling job status equals InProgress before moving forward in this notebook

[ ]:
sagemaker_client.describe_labeling_job(LabelingJobName=LABELING_JOB_NAME)["LabelingJobStatus"]

Check for LabelingJobStatus and interpreting describe response

  • If you specified “S3DataSource.ManifestS3Uri” in the above request, the objects in the S3 file will automatically make their way to the labeling job. You will see counters incrementing from the objects from the file.

  • Streaming jobs create a SQS queue in your account. You can check for existence of the queue by name “GroundTruth-LABELING_JOB_NAME” via console or through below command

[ ]:
sqs = boto3.client("sqs")
response = sqs.get_queue_url(QueueName="GroundTruth-" + LABELING_JOB_NAME.lower())
print("Queue url is :", response["QueueUrl"])

Job 2 Setup

Use the following section to set up your second labeling job. This labeling job will be chained to the first job that you set up above. This means the output data from the first labeling job will be sent to this labeling job as input data.

Bounding box, semantic segmentation, and all video frame and 3D point cloud labeling job types support an adjustment task which you can use to have worker modify and add to the annotations created in the first labeling job for that respective task type. You can select one of these adjustment task types below.

If you do not choose an adjustment task type, the output data from this second job will contain any new labels that workers add, as well as the labels added in the first labeling job.

[ ]:
# Job Name
LABELING_JOB_NAME2 = "GroundTruth-streaming-" + str(int(time.time()))

print("Your labeling job 2 name will be :", LABELING_JOB_NAME2)

SNS topics for Input and Output for Job 2

Input SNS topic for Job 2 is same as Output SNS topic of Job 1. This is how we will set up chaining. We will create Output SNS topic.

[ ]:
# Create Input Topic

# Output topic of Job 1
INPUT_SNS_TOPIC2 = OUTPUT_SNS_TOPIC
print("input_sns_topic of Job 2:", INPUT_SNS_TOPIC2)

# Create Output Topic
output_response = sns.create_topic(Name=LABELING_JOB_NAME2 + "-Output")
OUTPUT_SNS_TOPIC2 = output_response["TopicArn"]
print("output_sns_topic of Job 2:", OUTPUT_SNS_TOPIC2)

# The ARN for your SNS input topic.
INPUT_TOPIC_ARN2 = INPUT_SNS_TOPIC2

# The ARN for your SNS output topic.
OUTPUT_TOPIC_ARN2 = OUTPUT_SNS_TOPIC2

Choose Labeling Job Type [Job2]

Ground Truth supports a variety of built-in task types which streamline the process of creating image, text, video, video frame, and 3D point cloud labeling jobs. You can use this notebook on default mode if you do not want to bring your own input data and input manifest file.

If you have input data and an input manifest file in an S3 bucket, set DEFAULT to False and choose the Labeling Job Task Type you want to use below and specify the S3 URI of your input manifest file below. The S3 URI looks similar to s3://your-bucket/path-to-input-manifest/input-manifest.manifest. To learn more about each task type, see Built-in Task Types.

Choose Labeling Job Built-In Task Type

Copy one of the following task types and use it to set the value for task_type. If you set ``DEFAULT`` to True, at the beginning of this notebook, the image bounding box task type will be used by default.

[ ]:
## Choose from following:
## Bounding Box
## Image Classification (Single Label)
## Image Classification (Multi-label)
## Image Semantic Segmentation
## Text Classification (Single Label)
## Text Classification (Multi-label)
## Named Entity Recognition
## Video Classification
## Video Frame Object Detection
## Video Frame Object Tracking
## 3D Point Cloud Object Detection
## 3D Point Cloud Object Detection
## 3D Point Cloud Semantic Segmentation
## 3D Point Cloud Semantic Segmentation
## Adjustment Semantic Segmentation
## Verification Semantic Segmentation
## Verification Bounding Box
## Adjustment Bounding Box
## Adjustment Video Object Detection
## Adjustment Video Object Tracking
## Adjustment 3D Point Cloud Object Detection
## Adjustment 3D Point Cloud Object Tracking
## Adjustment 3D Point Cloud Semantic Segmentation

task_type2 = "<<COPY AND PASTE TASK TYPE FROM LIST ABOVE>>"
if DEFAULT:
    task_type2 = "Adjustment Bounding Box"
print(f"Your task type: {task_type2}")

The following cells will configure the lambda functions Ground Truth uses to pre-process your input data and output data. These cells will configure your PreHumanTaskLambdaArn and AnnotationConsolidationLambdaArn.

[ ]:
task_type_map2 = {
    "Bounding Box": "BoundingBox",
    "Image Classification (Single Label)": "ImageMultiClass",
    "Image Classification (Multi-label)": "ImageMultiClassMultiLabel",
    "Image Semantic Segmentation": "SemanticSegmentation",
    "Text Classification (Single Label)": "TextMultiClass",
    "Text Classification (Multi-label)": "TextMultiClassMultiLabel",
    "Named Entity Recognition": "NamedEntityRecognition",
    "Video Classification": "VideoMultiClass",
    "Video Frame Object Detection": "VideoObjectDetection",
    "Video Frame Object Tracking": "VideoObjectTracking",
    "3D Point Cloud Object Detection": "3DPointCloudObjectDetection",
    "3D Point Cloud Object Tracking": "3DPointCloudObjectTracking",
    "3D Point Cloud Semantic Segmentation": "3DPointCloudSemanticSegmentation",
    "Adjustment Semantic Segmentation": "AdjustmentSemanticSegmentation",
    "Verification Semantic Segmentation": "VerificationSemanticSegmentation",
    "Verification Bounding Box": "VerificationBoundingBox",
    "Adjustment Bounding Box": "AdjustmentBoundingBox",
    "Adjustment Video Object Detection": "AdjustmentVideoObjectDetection",
    "Adjustment Video Object Tracking": "AdjustmentVideoObjectTracking",
    "Adjustment 3D Point Cloud Object Detection": "Adjustment3DPointCloudObjectDetection",
    "Adjustment 3D Point Cloud Object Tracking": "Adjustment3DPointCloudObjectTracking",
    "Adjustment 3D Point Cloud Semantic Segmentation": "Adjustment3DPointCloudSemanticSegmentation",
}


arn_region_map = {
    "us-west-2": "081040173940",
    "us-east-1": "432418664414",
    "us-east-2": "266458841044",
    "eu-west-1": "568282634449",
    "eu-west-2": "487402164563",
    "ap-northeast-1": "477331159723",
    "ap-northeast-2": "845288260483",
    "ca-central-1": "918755190332",
    "eu-central-1": "203001061592",
    "ap-south-1": "565803892007",
    "ap-southeast-1": "377565633583",
    "ap-southeast-2": "454466003867",
}
[253]:
task_type_suffix2 = task_type_map2[task_type2]
region_account = arn_region_map[region]
PRE_HUMAN_TASK_LAMBDA2 = (
    f"arn:aws:lambda:{region}:{region_account}:function:PRE-{task_type_suffix2}"
)
POST_ANNOTATION_LAMBDA2 = (
    f"arn:aws:lambda:{region}:{region_account}:function:ACS-{task_type_suffix2}"
)
print(PRE_HUMAN_TASK_LAMBDA2)
print(POST_ANNOTATION_LAMBDA2)
arn:aws:lambda:us-west-2:081040173940:function:PRE-AdjustmentBoundingBox
arn:aws:lambda:us-west-2:081040173940:function:ACS-AdjustmentBoundingBox

3D point cloud and video frame task types have special requirements. The following variables will be used to configure your labeling job for these task types. To learn more, see the following topics in the documentation: * 3D Point Cloud Labeling Jobs Overview * Video Frame Labeling Job Overview

[ ]:
point_cloud_task = re.search(r"Point Cloud", task_type) is not None
video_frame_task = re.search(r"Video Frame", task_type) is not None

Create Custom Labeling Workflow

If you want to create a custom labeling workflow, you can create your own lambda functions to pre-process your input data and post-process the labels returned from workers. To learn more, see Step 3: Processing with AWS Lambda.

To use this notebook to run a custom flow, set CUSTOM to True and specify your pre- and post-processing lambdas below.

[ ]:
CUSTOM = False
if CUSTOM:
    PRE_HUMAN_TASK_LAMBDA2 = "<ADD-PRE-PROCESSING-LABMDA-ARN>"
    POST_ANNOTATION_LAMBDA2 = "<ADD-POST-PROCESSING-LABMDA-ARN>"

Specify Labels for Job 2

You specify the labels that you want workers to use to annotate your data in a label category configuration file. When you create a 3D point cloud or video frame labeling job, you can add label category attributes to your labeling category configruation file. Workers can assign one or more attributes to annotations to give more information about that object.

For all task types, you can use the following cell to identify the labels you use for your labeling job. To create a label category configuration file with label category attributes, see Create a Labeling Category Configuration File with Label Category Attributes in the Amazon SageMaker developer guide.

[ ]:
# Add label categories of your choice
LABEL_CATEGORIES = []
if DEFAULT:
    LABEL_CATEGORIES = ["Pedestrian", "Street Car", "Biker"]

The following cell will create a label category configuration file using the labels specified above.

IMPORTANT: Make sure you have added label categories above and they appear under labels when you run the following cell.

[ ]:
# Specify labels and this notebook will upload and a label category configuration file to S3.
json_body = {
    "document-version": "2018-11-28",
    "labels": [{"label": label} for label in LABEL_CATEGORIES],
}
with open("class_labels2.json", "w") as f:
    json.dump(json_body, f)

print("Your label category configuration file:")
print("\n", json.dumps(json_body, indent=2))
[ ]:
s3.upload_file("class_labels2.json", BUCKET, "class_labels2.json")
[ ]:
LABEL_CATEGORIES_S3_URI2 = f"s3://{BUCKET}/class_labels2.json"
print(f"You should now see class_labels2.json in {LABEL_CATEGORIES_S3_URI2}")

Create A Worker Task Template for Job 2

Part or all of your images will be annotated by human annotators. It is essential to provide good instructions. Good instructions are:

  1. Concise. We recommend limiting verbal/textual instruction to two sentences and focusing on clear visuals.

  2. Visual. In the case of object detection, we recommend providing several labeled examples with different numbers of boxes.

  3. When used through the AWS Console, Ground Truth helps you create the instructions using a visual wizard. When using the API, you need to create an HTML template for your instructions.

NOTE: If you use any images in your template (as we do), they need to be publicly accessible. You can enable public access to files in your S3 bucket through the S3 Console, as described in S3 Documentation.

Specify Resources Used for Human Task UI

The human task user interface (UI) is the interface that human workers use to label your data. Depending on the type of labeling job you create, you will specify a resource that is used to generate the human task UI in the UiConfig parameter of CreateLabelingJob.

For 3D point cloud and video frame labeling tasks, you will specify a pre-defined HumanTaskUiARN. For all other labeling job task types, you will specify a UiTemplateS3Uri.

Bounding Box Adjustment Labeling Job (Default)

If you set DEFAULT to True, use the following to create a worker task template and upload it to your S3 bucket. Ground Trust uses this template to generate your human task UI.

[ ]:
from IPython.core.display import HTML, display


def make_template(save_fname="instructions2.template"):
    template = r"""<script src="https://assets.crowd.aws/crowd-html-elements.js"></script>
    <crowd-form>
      <crowd-bounding-box
        name="boundingBox"
        src="{{{{ task.input.taskObject | grant_read_access }}}}"
        header="Dear Annotator, please adjust box around each object you see."
        labels="{{{{ task.input.labels | to_json | escape }}}}"
        initial-value="[
           {{% for box in task.input.manifestLine.{label_attribute_name_from_prior_job}.annotations %}}
             {{% capture class_id %}}{{{{ box.class_id }}}}{{% endcapture %}}
             {{% assign label = task.input.manifestLine.{label_attribute_name_from_prior_job}-metadata.class-map[class_id] %}}
           {{
           label: {{{{label | to_json}}}},
           left: {{{{box.left}}}},
           top: {{{{box.top}}}},
           width: {{{{box.width}}}},
           height: {{{{box.height}}}},
           }},
          {{% endfor %}}
       ]"
      >
        <full-instructions header="Bounding box adjustment instructions">

    <ol>
        <li><strong>Inspect</strong> the image</li>
        <li><strong>Determine</strong> if the specified label is/are visible in the picture.</li>
        <li><strong>Outline</strong> each instance of the specified label in the image using the provided “Box” tool.</li>
    </ol>


        </full-instructions>
        <short-instructions>
         <ul>
        <li>Boxes should fit tightly around each object</li>
        <li>Do not include parts of the object are overlapping or that cannot be seen, even though you think you can interpolate the whole shape.</li>
        <li>Avoid including shadows.</li>
        <li>If the target is off screen, draw the box up to the edge of the image.</li>
        </ul>
        </short-instructions>
      </crowd-bounding-box>
    </crowd-form>

    """.format(
        label_attribute_name_from_prior_job=LABEL_ATTRIBUTE_NAME
    )
    with open(save_fname, "w") as f:
        f.write(template)


if DEFAULT:
    make_template(save_fname="instructions2.template")
[ ]:
if DEFAULT:
    result = s3.upload_file("instructions2.template", BUCKET, "instructions2.template")

Image, Text, and Custom Labeling Jobs (Non Default)

For all image and text based built-in task types, you can find a sample worker task template on that task type page. Find the page for your task type on Built-in Task Types. You will see an example template under the section Create a {Insert-Task-Type} Job (API).

The following template shows an example of a Semantic Segmentation adjustment job template. This template can be used to render segmentation masks from a previous labeling job, to have workers adjust or add to the mask in the new labeling job.

For custom labeling workflows, you can provide a custom HTML worker task template using Crowd HTML Elements. To learn more, see Step 2: Creating your custom labeling task template.

Ground Trust uses this template to generate your human task UI.

Important: If you use your own template with the following make_template function to create and upload a worker task template to Amazon S3, you must add an extra pair of {} brackets around each Liquid element. For example, if the template contains {{ task.input.labels | to_json | escape }}, this line should look as follows in the make_template variable template: {{{{ task.input.labels | to_json | escape }}}}. The following semantic segmentation template already includes an extra pair of {} brackets around each Liquid element.

[ ]:
from IPython.core.display import HTML, display


def make_template(save_fname="instructions2.template"):
    template = r"""<script src="https://assets.crowd.aws/crowd-html-elements.js"></script>
<crowd-form>
  <crowd-semantic-segmentation
 initial-value="{{
  'src' : '{{{{ task.input.manifestLine.{label_attribute_name_from_prior_job}| grant_read_access }}}}',
  'labelMappings': {{
     {{% for box in task.input.manifestLine.{label_attribute_name_from_prior_job}-metadata.internal-color-map %}}
       {{% if box[1]['class-name'] != 'BACKGROUND' %}}
         {{{{ box[1]['class-name'] | to_json }}}}: {{
           'color': {{{{ box[1]['hex-color'] | to_json }}}}
         }},
        {{% endif %}}
        {{% endfor %}}
       }}
    }}"
    name="crowd-semantic-segmentation"
    src="{{{{ task.input.taskObject | grant_read_access }}}}"
    header="highlight bridges"
    labels="{{{{ task.input.labels | to_json | escape }}}}"
  >
    <full-instructions header="Segmentation instructions">
      <ol><li><strong>Read</strong> the task carefully and inspect the image.</li><li><strong>Read</strong> the options and review the examples provided to understand more about the labels.</li><li><strong>Choose</strong> the appropriate label that best suits the image.</li></ol>
    </full-instructions>
    <short-instructions>
      <h3><span style="color: rgb(0, 138, 0);">Good example</span></h3><p>Enter description to explain a correctly done segmentation</p><p><img src="https://d7evko5405gb7.cloudfront.net/ae0c1149-12cb-44b6-bff4-6171a09fb83c/src/images/quick-instructions-example-placeholder.png" style="max-width:100%"></p><h3><span style="color: rgb(230, 0, 0);">Bad example</span></h3><p>Enter description of an incorrectly done segmentation</p><p><img src="https://d7evko5405gb7.cloudfront.net/ae0c1149-12cb-44b6-bff4-6171a09fb83c/src/images/quick-instructions-example-placeholder.png" style="max-width:100%"></p>
    </short-instructions>
  </crowd-semantic-segmentation>
</crowd-form>""".format(
        label_attribute_name_from_prior_job=LABEL_ATTRIBUTE_NAME
    )
    with open(save_fname, "w") as f:
        f.write(template)


# This will upload your template to S3 if you are not running on DEFAULT mode, and if your take type
# does not use video frames or 3D point clouds.
if not DEFAULT and not video_frame_task and not point_cloud_task:
    make_template(save_fname="instructions2.template")
    s3.upload_file("instructions2.template", BUCKET, "instructions2.template")

3D Point Cloud and Video Frame Task Types

If you are creating a 3D point cloud or video frame task type, your worker UI is configured by Ground Truth. If you chose one of these task types above, the following cell will specify the correct template.

[ ]:
import re

if not DEFAULT:
    if point_cloud_task:
        task_type_suffix_humanuiarn = task_type_suffix.split("3D")[-1]
        HUMAN_UI_ARN2 = (
            f"arn:aws:sagemaker:{region}:394669845002:human-task-ui/{task_type_suffix_humanuiarn}"
        )
    if video_frame_task:
        HUMAN_UI_ARN2 = f"arn:aws:sagemaker:{region}:394669845002:human-task-ui/{task_type_suffix}"
        print(f"The Human Task UI ARN is: {HUMAN_UI_ARN2}")

(Optional) Create an Input Manifest File

You can optionally specify an input manifest file Amazon S3 URI in ManifestS3Uri when you create the streaming labeling job. Ground Truth sends each data object in the manifest file to workers for labeling as soon as the labeling job starts.

Each line in an input manifest file is an entry containing an object, or a reference to an object, to label. An entry can also contain labels from previous jobs and for some task types, additional information.

To learn how to create an input manifest file, see Use an Input Manifest File. Copy the S3 URI of the file below.

As Job 2 is a chained job, you can connect Output manifest of Job 1 to Job 2. In this case, you may get objects same objects in Job 2 from both output SNS and output S3 of Job 1. These will be considered duplicates and ignored if idempotency key is the same. For simplicity, leave this field blank unless you really need it!

[ ]:
# [Optional] The path in Amazon S3 to your input manifest file.
INPUT_MANIFEST = ""

Specify Parameters for Labeling Job 2

If you set DEFAULT to False, you must specify the following parameters. These will be used to configure and create your lableing job. If you set DEFAULT to True, default parameters will be used.

To learn more about these parameters, use the following documentation: * TaskTitle * TaskDescription * TaskKeywords

[ ]:
TASK_TITLE2 = "<<ADD-TASK-TITLE>>"

if DEFAULT:
    TASK_TITLE2 = "Adjust Bounding boxes around objects"

TASK_DESCRIPTION2 = "<<ADD-TASK-DESCRIPTION>>"

if DEFAULT:
    TASK_DESCRIPTION2 = "Adjust bounding boxes around specified objects in your images"

# Keywords for your task, in a string-array. ex) ['image classification', 'image dataset']
TASK_KEYWORDS2 = ["<<ADD-KEYWODS>>"]

if DEFAULT:
    TASK_KEYWORDS2 = ["bounding box", "image dataset"]
[ ]:
# The path in Amazon S3 to your worker task template or human task UI
HUMAN_UI2 = []
if point_cloud_task or video_frame_task:
    HUMAN_TASK_UI_ARN2 = HUMAN_UI_ARN2
    HUMAN_UI2.append(HUMAN_TASK_UI_ARN2)
    UI_CONFIG_PARAM = "HumanTaskUiArn"
else:
    UI_TEMPLATE_S3_URI2 = f"s3://{BUCKET}/instructions2.template"
    HUMAN_UI2.append(UI_TEMPLATE_S3_URI2)
    UI_CONFIG_PARAM = "UiTemplateS3Uri"

print(f"{UI_CONFIG_PARAM} resource that will be used: {HUMAN_UI2[0]}")
[ ]:
# If you want to store your output manifest in a different folder, provide an OUTPUT_PATH.
OUTPUT_FOLDER_PREFIX = "/gt-streaming-demo-output"
OUTPUT_BUCKET = "s3://" + BUCKET + OUTPUT_FOLDER_PREFIX
print("Your output data will be stored in:", OUTPUT_BUCKET)

# An IAM role with AmazonGroundTruthExecution policies attached.
# This must be the same role that you used to create this notebook instance.
ROLE_ARN = role

Use the CreateLabelingJob API to Create a 2nd Streaming Labeling Job

[ ]:
if (
    re.search(r"Semantic Segmentation", task_type) is not None
    or re.match(r"Object Tracking", task_type) is not None
    or video_frame_task
):
    LABEL_ATTRIBUTE_NAME2 = LABELING_JOB_NAME2 + "-ref"
else:
    LABEL_ATTRIBUTE_NAME2 = LABELING_JOB_NAME2

human_task_config = {
    "PreHumanTaskLambdaArn": PRE_HUMAN_TASK_LAMBDA2,
    "MaxConcurrentTaskCount": 100,  # Maximum of 100 objects will be available to the workteam at any time
    "NumberOfHumanWorkersPerDataObject": 1,  # We will obtain and consolidate 1 human annotationsfor each image.
    "TaskAvailabilityLifetimeInSeconds": 21600,  # Your workteam has 6 hours to complete all pending tasks.
    "TaskDescription": TASK_DESCRIPTION2,
    # If using public workforce, specify "PublicWorkforceTaskPrice"
    "WorkteamArn": WORKTEAM_ARN,
    "AnnotationConsolidationConfig": {"AnnotationConsolidationLambdaArn": POST_ANNOTATION_LAMBDA2},
    "TaskKeywords": TASK_KEYWORDS2,
    "TaskTimeLimitInSeconds": 600,  # Each image must be labeled within 10 minutes.
    "TaskTitle": TASK_TITLE2,
    "UiConfig": {UI_CONFIG_PARAM: HUMAN_UI2[0]},
}

# if you are using the Amazon Mechanical Turk workforce, specify the amount you want to pay a
# worker to label a data object. See https://aws.amazon.com/sagemaker/groundtruth/pricing/ for recommendations.
if not private_work_team:
    human_task_config["PublicWorkforceTaskPrice"] = {
        "AmountInUsd": {
            "Dollars": 0,
            "Cents": 3,
            "TenthFractionsOfACent": 6,
        }
    }
    human_task_config["WorkteamArn"] = WORKTEAM_ARN
else:
    human_task_config["WorkteamArn"] = WORKTEAM_ARN

ground_truth_request2 = {
    "InputConfig": {"DataSource": {"SnsDataSource": {"SnsTopicArn": INPUT_TOPIC_ARN2}}},
    "HumanTaskConfig": human_task_config,
    "LabelAttributeName": LABEL_ATTRIBUTE_NAME2,
    "LabelCategoryConfigS3Uri": LABEL_CATEGORIES_S3_URI2,
    "LabelingJobName": LABELING_JOB_NAME2,
    "OutputConfig": {"S3OutputPath": OUTPUT_BUCKET, "SnsTopicArn": OUTPUT_TOPIC_ARN2},
    "RoleArn": ROLE_ARN,
}

if INPUT_MANIFEST is not "":
    ground_truth_request2["InputConfig"]["DataSource"]["S3DataSource"] = {
        "ManifestS3Uri": INPUT_MANIFEST
    }

You should not share explicit, confidential, or personal information or protected health information with the Amazon Mechanical Turk workforce.

If you are using Amazon Mechanical Turk workforce, you must verify that your data is free of personal, confidential, and explicit content and protected health information using this code cell.

[ ]:
if not private_work_team:
    ground_truth_request2["InputConfig"]["DataAttributes"] = {
        "ContentClassifiers": ["FreeOfPersonallyIdentifiableInformation", "FreeOfAdultContent"]
    }
[ ]:
print("Your create labeling job request:\n", json.dumps(ground_truth_request2, indent=4))
[ ]:
sagemaker_client = boto3.client("sagemaker")
sagemaker_client.create_labeling_job(**ground_truth_request2)

Use the DescribeLabelingJob API to describe 2nd Streaming Labeling Job

[ ]:
sagemaker_client.describe_labeling_job(LabelingJobName=LABELING_JOB_NAME2)

Publish a new object to your first labeling job [Job 1] once it has started

Once you start a labeling job, you an publish a new request to it using Amazon SNS.

Configure your Request

You will need to specify REQUEST in the following format:

For non-text objects

First, make sure that your object is located in s3_bucket_location

{"source-ref": "s3_bucket_location"}

For text objects

{"source": "Lorem ipsum dolor sit amet"}

Modify one of these examples to specify your request in the next cell.

[ ]:
REQUEST = "<Populate your object as shown above>"

If you set Default to True use the following cell upload a sample-image to your S3 bucket and send that image to labeling job.

[ ]:
if(DEFAULT):
    !wget https://aws-ml-blog.s3.amazonaws.com/artifacts/gt-labeling-job-resources/example-image.jpg
    s3.upload_file('example-image.jpg', BUCKET, 'example-image.jpg')
    REQUEST = str({"source-ref": f"s3://{BUCKET}/example-image.jpg"})
print(f'Your request: {REQUEST}')

Publish Your Request

First, check the LabelCounters variable for your labeling job using DescribeLabelingJob. After you publish your request, you’ll see Unlabeled increases to 1 (or the number of objects you send to your labeling job).

[ ]:
sagemaker_client.describe_labeling_job(LabelingJobName=LABELING_JOB_NAME)["LabelCounters"]

The following will publish your request to your Amazon SNS input topic.

[ ]:
# TopicArn is of the first job [Job 1]
print(f"Your Request: {REQUEST}\n")
if REQUEST != "<Populate your object as shown above>":
    published_message = sns.publish(TopicArn=INPUT_TOPIC_ARN, Message=REQUEST)
    print(f"Published Message: {published_message}")

You may need to wait 1 to 2 minutes for your request to appear in LabelCounters.

[ ]:
sagemaker_client.describe_labeling_job(LabelingJobName=LABELING_JOB_NAME)["LabelCounters"]

After your first job finishes, check the status of your chained job. You should see your request appear in LabelCounters.

[ ]:
sagemaker_client.describe_labeling_job(LabelingJobName=LABELING_JOB_NAME2)["LabelCounters"]

Call StopLabelingJob for your previously launched jobs

To stop your Streaming job, call StopLabelingJob twice: with the LABELING_JOB_NAME and LABELING_JOB_NAME2.

[ ]:
sagemaker_client.stop_labeling_job(LabelingJobName=LABELING_JOB_NAME)
[ ]:
sagemaker_client.stop_labeling_job(LabelingJobName=LABELING_JOB_NAME2)