Create a Ground Truth Streaming Labeling Job

You can use a streaming labeling job to perpetually send new data objects to Amazon SageMaker Ground Truth to be labeled. Ground Truth streaming labeling jobs remain active until they are manually stopped or have been idle for more than 10 days. You can intermittently send new data objects to workers while the labeling job is active.

Use this notebook to create a Ground Truth streaming labeling job using any of the built-in task types. You can make necessary parameter changes for the custom workflow. You can either configure the notebook to create a labeling job using your own input data, or run the notebook on default mode and use provided, image input data. To use your own input data, set ``DEFAULT`` to ``False``.

[ ]:

To read more about streaming labeling jobs, see the Amazon SageMaker documentation on Ground Truth Streaming Labeling Jobs.

To learn more about each step in this notebook, refer to Create a Streaming Labeling Job.

Get latest version of AWS python SDK

[ ]:
!pip install -q --upgrade pip
!pip install awscli -q --upgrade
!pip install botocore -q --upgrade
!pip install boto3 -q --upgrade
!pip install sagemaker -q --upgrade

# NOTE: Restart Kernel after the above command
[ ]:
import boto3
import botocore
import json
import time
import sagemaker
import re


You will create some of the resources you need to launch a Ground Truth streaming labeling job in this notebook. You must create the following resources before executing this notebook:

  • A work team. A work team is a group of workers that complete labeling tasks. If you want to preview the worker UI and execute the labeling task you will need to create a private work team, add yourself as a worker to this team, and provide the work team ARN below. If you do not want to use a private or vendor work team ARN, set private_work_team to False to use the Amazon Mechanical Turk workforce. To learn more about private, vendor, and Amazon Mechanical Turk workforces, see Create and Manage Workforces.

    • IMPORTANT: 3D point cloud and video frame labeling jobs only support private and vendor workforces. If you plan to use 3D point cloud or video frame input data, specify a private or vendor workforce below for WORKTEAM_ARN.

  • If you use this notebook to create an image, video frame, or 3D point cloud labeling job, the S3 bucket that you use for this demo must have a CORS policy attached. To learn more about this requirement, and how to attach a CORS policy to an S3 bucket, see CORS Permission Requirement.

[ ]:
private_work_team = True  # Set it to false if using Amazon Mechanical Turk Workforce

if private_work_team:
    region = boto3.session.Session().region_name
    WORKTEAM_ARN = f"arn:aws:sagemaker:{region}:394669845002:workteam/public-crowd/default"
print(f"This notebook will use the work team ARN: {WORKTEAM_ARN}")
[ ]:
# Make sure workteam arn is populated if private work team is chosen
  • The IAM execution role you used to create this notebook instance must have the following permissions:

    • AWS managed policy AmazonSageMakerGroundTruthExecution. Run the following code-block to see your IAM execution role name. This GIF demonstrates how to add this policy to an IAM role in the IAM console. You can also find instructions in the IAM User Guide: Adding and removing IAM identity permissions.

    • When you create your role, you specify Amazon S3 permissions. Make sure that your IAM role has access to the S3 bucket that you plan to use in this example. If you do not specify an S3 bucket in this notebook, the default bucket in the AWS region you are running this notebook instance will be used. If you do not require granular permissions, you can attach AmazonS3FullAccess to your role.

[ ]:
role = sagemaker.get_execution_role()
role_name = role.split("/")[-1]
    "IMPORTANT: Make sure this execution role has the AWS Managed policy AmazonGroundTruthExecution attached."
print("The IAM execution role name:", role_name)
print("The IAM execution role ARN:", role)
[ ]:
sess = sagemaker.Session()
    BUCKET = sess.default_bucket()
region = boto3.session.Session().region_name
s3 = boto3.client("s3")
# Make sure the bucket is in the same region as this notebook.
bucket_region = s3.head_bucket(Bucket=BUCKET)["ResponseMetadata"]["HTTPHeaders"][
assert (
    bucket_region == region
), f"Your S3 bucket {BUCKET} and this notebook need to be in the same region."
print(f"IMPORTANT: make sure the role {role_name} has the access to read and write to this bucket.")
print(f"This notebook will use the following S3 bucket: {BUCKET}")

Create SNS Topics for Input and Output Data

You can send data objects to your streaming labeling job using Amazon Simple Notification Service (Amazon SNS). Amazon SNS is a web service that coordinates and manages the delivery of messages to and from endpoints (for example, an email address or AWS Lambda function). An Amazon SNS topic acts as a communication channel between two or more endpoints. You use Amazon SNS to send, or publish, new data objects to the topic specified in the CreateLabelingJob parameter SnsTopicArn in InputConfig.

The following cells will create a name for your labeling job and use this name to create Amazon SNS input and output topics. This labeling job name and these topics will be used in your CreateLabelingJob request later in this notebook.

[ ]:
# Job Name
LABELING_JOB_NAME = "GroundTruth-streaming-" + str(int(time.time()))
print("Your labeling job name will be :", LABELING_JOB_NAME)
[ ]:
# Make sure role has "Sns:CreateTopic" access
sns = boto3.client("sns")

# Create Input Topic
input_response = sns.create_topic(Name=LABELING_JOB_NAME + "-Input")
INPUT_SNS_TOPIC_ARN = input_response["TopicArn"]
print("input_sns_topic :", INPUT_SNS_TOPIC_ARN)

# Create Output Topic
output_response = sns.create_topic(Name=LABELING_JOB_NAME + "-Output")
OUTPUT_SNS_TOPIC_ARN = output_response["TopicArn"]
print("output_sns_topic :", OUTPUT_SNS_TOPIC_ARN)

Choose Labeling Job Type

Ground Truth supports a variety of built-in task types which streamline the process of creating image, text, video, video frame, and 3D point cloud labeling jobs. You can use this notebook on default mode if you do not want to bring your own input data.

If you have input data and an input manifest file in an S3 bucket, set DEFAULT to False and, optionally, choose the Labeling Job Task Type you want to use below. To learn more about each task type, see Built-in Task Types.

Choose Labeling Job Built-In Task Type

Copy one of the following task types and use it to set the value for task_type. If you set ``DEFAULT`` to True, at the beginning of this notebook, the image bounding box task type will be used by default.

[ ]:
## Choose from following:
## Bounding Box
## Image Classification (Single Label)
## Image Classification (Multi-label)
## Image Semantic Segmentation
## Text Classification (Single Label)
## Text Classification (Multi-label)
## Named Entity Recognition
## Video Classification
## Video Frame Object Detection
## Video Frame Object Tracking
## 3D Point Cloud Object Detection
## 3D Point Cloud Object Detection
## 3D Point Cloud Semantic Segmentation

    task_type = "Bounding Box"
print(f"Your task type: {task_type}")

The following cells will configure the lambda functions Ground Truth uses to pre-process your input data and output data. These cells will configure your PreHumanTaskLambdaArn and AnnotationConsolidationLambdaArn.

[ ]:
task_type_map = {
    "Bounding Box": "BoundingBox",
    "Image Classification (Single Label)": "ImageMultiClass",
    "Image Classification (Multi-label)": "ImageMultiClassMultiLabel",
    "Image Semantic Segmentation": "SemanticSegmentation",
    "Text Classification (Single Label)": "TextMultiClass",
    "Text Classification (Multi-label)": "TextMultiClassMultiLabel",
    "Named Entity Recognition": "NamedEntityRecognition",
    "Video Classification": "VideoMultiClass",
    "Video Frame Object Detection": "VideoObjectDetection",
    "Video Frame Object Tracking": "VideoObjectTracking",
    "3D Point Cloud Object Detection": "3DPointCloudObjectDetection",
    "3D Point Cloud Object Tracking": "3DPointCloudObjectTracking",
    "3D Point Cloud Semantic Segmentation": "3DPointCloudSemanticSegmentation",

arn_region_map = {
    "us-west-2": "081040173940",
    "us-east-1": "432418664414",
    "us-east-2": "266458841044",
    "eu-west-1": "568282634449",
    "eu-west-2": "487402164563",
    "ap-northeast-1": "477331159723",
    "ap-northeast-2": "845288260483",
    "ca-central-1": "918755190332",
    "eu-central-1": "203001061592",
    "ap-south-1": "565803892007",
    "ap-southeast-1": "377565633583",
    "ap-southeast-2": "454466003867",
[ ]:
task_type_suffix = task_type_map[task_type]
region_account = arn_region_map[region]
PRE_HUMAN_TASK_LAMBDA = f"arn:aws:lambda:{region}:{region_account}:function:PRE-{task_type_suffix}"
POST_ANNOTATION_LAMBDA = f"arn:aws:lambda:{region}:{region_account}:function:ACS-{task_type_suffix}"

3D point cloud and video frame task types have special requirements. The following variables will be used to configure your labeling job for these task types. To learn more, see the following topics in the documentation: * 3D Point Cloud Labeling Jobs Overview * Video Frame Labeling Job Overview

[ ]:
point_cloud_task ="Point Cloud", task_type) is not None
video_frame_task ="Video Frame", task_type) is not None

Create Custom Labeling Workflow

If you want to create a custom labeling workflow, you can create your own lambda functions to pre-process your input data and post-process the labels returned from workers. To learn more, see Step 3: Processing with AWS Lambda.

To use this notebook to run a custom flow, set CUSTOM to True and specify your pre- and post-processing lambdas below.

[ ]:
CUSTOM = False

Specify Labels

You specify the labels that you want workers to use to annotate your data in a label category configuration file. When you create a 3D point cloud or video frame labeling job, you can add label category attributes to your labeling category configruation file. Workers can assign one or more attributes to annotations to give more information about that object.

For all task types, you can use the following cell to identify the labels you use for your labeling job. To create a label category configuration file with label category attributes, see Create a Labeling Category Configuration File with Label Category Attributes in the Amazon SageMaker developer guide.

[ ]:
# Add label categories of your choice
    LABEL_CATEGORIES = ["Pedestrian", "Street Car", "Biker"]

The following cell will create a label category configuration file using the labels specified above.

IMPORTANT: Make sure you have added label categories above and they appear under labels when you run the following cell.

[ ]:
# Specify labels and this notebook will upload and a label category configuration file to S3.
json_body = {
    "document-version": "2018-11-28",
    "labels": [{"label": label} for label in LABEL_CATEGORIES],
with open("class_labels.json", "w") as f:
    json.dump(json_body, f)

print("Your label category configuration file:")
print("\n", json.dumps(json_body, indent=2))
[ ]:
s3.upload_file("class_labels.json", BUCKET, "class_labels.json")
[ ]:
LABEL_CATEGORIES_S3_URI = f"s3://{BUCKET}/class_labels.json"
print(f"You should now see class_labels.json in {LABEL_CATEGORIES_S3_URI}")

Create A Worker Task Template

Part or all of your images will be annotated by human annotators. It is essential to provide good instructions. Good instructions are:

  1. Concise. We recommend limiting verbal/textual instruction to two sentences and focusing on clear visuals.

  2. Visual. In the case of object detection, we recommend providing several labeled examples with different numbers of boxes.

  3. When used through the AWS Console, Ground Truth helps you create the instructions using a visual wizard. When using the API, you need to create an HTML template for your instructions.

NOTE: If you use any images in your template (as we do), they need to be publicly accessible. You can enable public access to files in your S3 bucket through the S3 Console, as described in S3 Documentation.

Specify Resources Used for Human Task UI

The human task user interface (UI) is the interface that human workers use to label your data. Depending on the type of labeling job you create, you will specify a resource that is used to generate the human task UI in the UiConfig parameter of CreateLabelingJob.

For 3D point cloud and video frame labeling tasks, you will specify a pre-defined HumanTaskUiARN. For all other labeling job task types, you will specify a UiTemplateS3Uri.

Bounding Box Image Labeling Job (Default)

If you set DEFAULT to True, use the following to create a worker task template and upload it to your S3 bucket. Ground Trust uses this template to generate your human task UI.

[ ]:
from IPython.core.display import HTML, display

def make_template(save_fname="instructions.template"):
    template = r"""<script src=""></script>
        src="{{{{ task.input.taskObject | grant_read_access }}}}"
        header="Dear Annotator, please draw a tight box around each object you see (if there are more than 8 objects, draw boxes around at least 8)."
        labels="{{{{ task.input.labels | to_json | escape }}}}"
        <full-instructions header="Please annotate each object">

        <li><strong>Inspect</strong> the image</li>
        <li><strong>Determine</strong> if the specified label is/are visible in the picture.</li>
        <li><strong>Outline</strong> each instance of the specified label in the image using the provided “Box” tool.</li>

        <li>Boxes should fit tightly around each object</li>
        <li>Do not include parts of the object are overlapping or that cannot be seen, even though you think you can interpolate the whole shape.</li>
        <li>Avoid including shadows.</li>
        <li>If the target is off screen, draw the box up to the edge of the image.</li>

    with open(save_fname, "w") as f:

[ ]:
    result = s3.upload_file("instructions.template", BUCKET, "instructions.template")

Image, Text, and Custom Labeling Jobs (Non Default)

For all image and text based built-in task types, you can find a sample worker task template on that task type page. Find the page for your task type on Built-in Task Types. You will see an example template under the section Create a {Insert-Task-Type} Job (API).

Update <full-instructions></full-instructions> and <short-instructions></short-instructions>. Add your template to the following code block and run the code blocks below to generate your worker task template and upload it to your S3 bucket.

For custom labeling workflows, you can provide a custom HTML worker task template using Crowd HTML Elements. To learn more, see Step 2: Creating your custom labeling task template.

Ground Trust uses this template to generate your human task UI.

Important: If you use the following make_template function to create and upload a worker task template to Amazon S3, you must add an extra pair of {} brackets around each Liquid element. For example, if the template contains {{ task.input.labels | to_json | escape }}, this line should look as follows in the make_template variable template: {{{{ task.input.labels | to_json | escape }}}}.

[ ]:
from IPython.core.display import HTML, display

def make_template(save_fname="instructions.template"):
    template = r"""
    with open(save_fname, "w") as f:

# This will upload your template to S3 if you are not running on DEFAULT mode, and if your take type
# does not use video frames or 3D point clouds.
if not DEFAULT and not video_frame_task and not point_cloud_task:
    s3.upload_file("instructions.template", BUCKET, "instructions.template")

3D Point Cloud and Video Frame Task Types

If you are creating a 3D point cloud or video frame task type, your worker UI is configured by Ground Truth. If you chose one of these task types above, the following cell will specify the correct template.

[ ]:
import re

if not DEFAULT:
    if point_cloud_task:
        task_type_suffix_humanuiarn = task_type_suffix.split("3D")[-1]
        HUMAN_UI_ARN = (
    if video_frame_task:
        HUMAN_UI_ARN = f"arn:aws:sagemaker:{region}:394669845002:human-task-ui/{task_type_suffix}"
    print(f"The Human Task UI ARN is: {HUMAN_UI_ARN}")

(Optional) Create an Input Manifest File

You can optionally specify an input manifest file Amazon S3 URI in ManifestS3Uri when you create the streaming labeling job. Ground Truth sends each data object in the manifest file to workers for labeling as soon as the labeling job starts.

Each line in an input manifest file is an entry containing an object, or a reference to an object, to label. An entry can also contain labels from previous jobs and for some task types, additional information.

To learn how to create an input manifest file, see Use an Input Manifest File. Copy the S3 URI of the file below.

[ ]:
# [Optional] The path in Amazon S3 to your input manifest file.

Specify Parameters for Labeling Job

If you set DEFAULT to False, you must specify the following parameters. These will be used to configure and create your lableing job. If you set DEFAULT to True, default parameters will be used.

To learn more about these parameters, use the following documentation: * TaskTitle * TaskDescription * TaskKeywords

[ ]:
    TASK_TITLE = "Add bounding boxes to detect objects in an image"

    TASK_DESCRIPTION = "Categorize images into classes using bounding boxes"

# Keywords for your task, in a string-array. ex) ['image classification', 'image dataset']
    TASK_KEYWORDS = ["bounding box", "image dataset"]

Run the following to specify the rest of the parameters required to configure your labeling job.

[ ]:
# The path in Amazon S3 to your worker task template or human task UI
if point_cloud_task or video_frame_task:
    UI_CONFIG_PARAM = "HumanTaskUiArn"
    UI_TEMPLATE_S3_URI = f"s3://{BUCKET}/instructions.template"
    UI_CONFIG_PARAM = "UiTemplateS3Uri"

print(f"{UI_CONFIG_PARAM} resource that will be used: {HUMAN_UI[0]}")
[ ]:
# If you want to store your output manifest in a different folder, provide an OUTPUT_PATH.
OUTPUT_FOLDER_PREFIX = "/gt-streaming-demo-output"
print("Your output data will be stored in:", OUTPUT_BUCKET)

# An IAM role with AmazonGroundTruthExecution policies attached.
# This must be the same role that you used to create this notebook instance.
ROLE_ARN = role

Use the CreateLabelingJob API to create a streaming labeling job

[ ]:
if ("Semantic Segmentation", task_type) is not None
    or re.match(r"Object Tracking", task_type) is not None
    or video_frame_task

human_task_config = {
    "PreHumanTaskLambdaArn": PRE_HUMAN_TASK_LAMBDA,
    "MaxConcurrentTaskCount": 100,  # Maximum of 100 objects will be available to the workteam at any time
    "NumberOfHumanWorkersPerDataObject": 1,  # We will obtain and consolidate 1 human annotationsfor each image.
    "TaskAvailabilityLifetimeInSeconds": 21600,  # Your workteam has 6 hours to complete all pending tasks.
    "TaskDescription": TASK_DESCRIPTION,
    # If using public workforce, specify "PublicWorkforceTaskPrice"
    "WorkteamArn": WORKTEAM_ARN,
    "AnnotationConsolidationConfig": {"AnnotationConsolidationLambdaArn": POST_ANNOTATION_LAMBDA},
    "TaskKeywords": TASK_KEYWORDS,
    "TaskTimeLimitInSeconds": 600,  # Each image must be labeled within 10 minutes.
    "TaskTitle": TASK_TITLE,
    "UiConfig": {UI_CONFIG_PARAM: HUMAN_UI[0]},

# if you are using the Amazon Mechanical Turk workforce, specify the amount you want to pay a
# worker to label a data object. See for recommendations.
if not private_work_team:
    human_task_config["PublicWorkforceTaskPrice"] = {
        "AmountInUsd": {
            "Dollars": 0,
            "Cents": 3,
            "TenthFractionsOfACent": 6,
    human_task_config["WorkteamArn"] = WORKTEAM_ARN
    human_task_config["WorkteamArn"] = WORKTEAM_ARN

ground_truth_request = {
    "InputConfig": {"DataSource": {"SnsDataSource": {"SnsTopicArn": INPUT_SNS_TOPIC_ARN}}},
    "HumanTaskConfig": human_task_config,
    "LabelAttributeName": LABEL_ATTRIBUTE_NAME,
    "LabelCategoryConfigS3Uri": LABEL_CATEGORIES_S3_URI,
    "LabelingJobName": LABELING_JOB_NAME,
    "OutputConfig": {"S3OutputPath": OUTPUT_BUCKET, "SnsTopicArn": OUTPUT_SNS_TOPIC_ARN},
    "RoleArn": ROLE_ARN,

if INPUT_MANIFEST is not "":
    ground_truth_request["InputConfig"]["DataSource"]["S3DataSource"] = {
        "ManifestS3Uri": INPUT_MANIFEST

You should not share explicit, confidential, or personal information or protected health information with the Amazon Mechanical Turk workforce.

If you are using Amazon Mechanical Turk workforce, you must verify that your data is free of personal, confidential, and explicit content and protected health information using this code cell.

[ ]:
if not private_work_team:
    ground_truth_request["InputConfig"]["DataAttributes"] = {
        "ContentClassifiers": ["FreeOfPersonallyIdentifiableInformation", "FreeOfAdultContent"]
[ ]:
print("Your create labeling job request:\n", json.dumps(ground_truth_request, indent=4))
[ ]:
sagemaker_client = boto3.client("sagemaker")

Use the DescribeLabelingJob API to describe a streaming labeling job

[ ]:

Wait until the labeling job status equals InProgress before moving forward in this notebook

[ ]:

Check for LabelingJobStatus and interpreting describe response

  • If you specified “S3DataSource.ManifestS3Uri” in the above request, the objects in the S3 file will automatically make their way to the labeling job. You will see counters incrementing from the objects from the file.

  • Streaming jobs create a SQS queue in your account. You can check for existence of the queue by name “GroundTruth-LABELING_JOB_NAME” via console or through below command

[ ]:
sqs = boto3.client("sqs")
response = sqs.get_queue_url(QueueName="GroundTruth-" + LABELING_JOB_NAME.lower())
print("Queue url is :", response["QueueUrl"])

Publish a new object to your labeling job once it has started

Once you start a labeling job, you an publish a new request to it using Amazon SNS.

Configure your Request

You will need to specify REQUEST in the following format:

For non-text objects

First, make sure that your object is located in s3_bucket_location

{"source-ref": "s3_bucket_location"}

For text objects

{"source": "Lorem ipsum dolor sit amet"}

Modify one of these examples to specify your request in the next cell.

[ ]:
REQUEST = "<Populate your object as shown above>"

If you set Default to True use the following cell upload a sample-image to your S3 bucket and send that image to labeling job.

[ ]:
    s3.upload_file('example-image.jpg', BUCKET, 'example-image.jpg')
    REQUEST = str({"source-ref": f"s3://{BUCKET}/example-image.jpg"})
print(f'Your request: {REQUEST}')

Publish Your Request

First, check the LabelCounters variable for your labeling job using DescribeLabelingJob. After you publish your request, you’ll see Unlabeled increases to 1 (or the number of objects you send to your labeling job).

[ ]:

The following will publish your request to your Amazon SNS input topic.

[ ]:
print(f"Your Request: {REQUEST}\n")
if REQUEST != "<Populate your object as shown above>":
    published_message = sns.publish(TopicArn=INPUT_SNS_TOPIC_ARN, Message=REQUEST)
    print(f"Published Message: {published_message}")

You may need to wait 1 to 2 minutes for your request to appear in LabelCounters.

[ ]:

Call StopLabelingJob for your previously launched job

To stop your Streaming job, call StopLabelingJob with the LABELING_JOB_NAME.

[ ]: