Automate multi-modality, parallel data labeling workflows with Amazon SageMaker Ground Truth and AWS Step Functions

Data labeling often requires a single data object to include multiple types of annotations, or multi-type, such as 2D boxes (bounding boxes), lines, and segmentation masks, all on a single image. Additionally, to create high-quality machine learning (ML) models using labeled data, you need a way to monitor the quality of the labels. You can do this by creating a workflow in which labeled data is audited and adjusted as needed. This notebook introduces a solution to address both of these labeling challenges using an automotive dataset, and you can extend this solution for use with any type of dataset.

This notebook walks through an example situation where we generate multiple types of annotations for an automotive scene. Specifically we’ll run a total of four labeling jobs per input video clip: (1) an initial labeling of vehicles, (2) initial labeling of lanes, and then (3) an adjustment job per initial job.

This notebook only works if you have deployed the corresponding AWS CloudFormation template. You can deploy the solution in us-east-1 by clicking here. To deploy to other regions or to customize the deployment, see README.md provided with this notebook.

Recommended Kernel: conda_python3

While following along with this blog post, we recommend that you leave most of the cells unmodified. However, the notebook will indicate where you can modify variables to create the resources needed for a custom labeling job.

Let’s start by importing required libraries and initializing session and other variables used in this notebook. By default, the notebook uses the default Amazon S3 bucket in the same AWS Region you use to run this notebook. If you want to use a different S3 bucket, make sure it is in the same AWS Region you use to complete this tutorial, and specify the bucket name for bucket.

[ ]:
!pip3 install requests_aws_sign
[ ]:
%pylab inline
from enum import Enum
from pprint import pprint
import uuid
import urllib
import os
import json
from glob import glob

import requests
import boto3
from PIL import Image
import sagemaker as sm

import requests_aws_sign
from requests_aws_sign import AWSV4Sign

Prerequisites

You will create some of the resources you need to launch workflows using SageMaker Ground Truth in this notebook. You must create and populate the following resources before executing this notebook:

  • API Gateway Endpoint. An API Gateway endpoint is setup as part of the workflows solution and obtain the API identifier rest_api_id,Region region in the API Gateway Console and assign AWS_REST_API_ID, AWS_ACCOUNT_ID and AWS_REGION variables.

[ ]:
AWS_REST_API_ID = "<<ADD REST API Gateway ID HERE>>"
AWS_ACCOUNT_ID = "<<ADD AWS Account ID HERE>>"
AWS_REGION = region = boto3.session.Session().region_name
[ ]:
# Make sure api gateway id is populated
assert AWS_REST_API_ID != "<<ADD REST API Gateway ID HERE>>"

# Make sure AWS account number is populated
assert AWS_ACCOUNT_ID != "<<ADD AWS Account ID HERE>>"

# Make sure AWS Region is populated
assert AWS_REGION != "<<ADD AWS Region HERE>>"
[ ]:
BASE_URL = f"https://{AWS_REST_API_ID}.execute-api.{AWS_REGION}.amazonaws.com/prod"
API_URL_POST_BATCH_CREATE = f"{BASE_URL}/batch/create"
API_URL_GET_BATCH_SHOW = f"{BASE_URL}/batch/show"
API_URL_POST_BATCH_METADATA_POST = f"{BASE_URL}/batch-metadata/post"
API_URL_GET_WORKFORCE_SHOW = f"{BASE_URL}/workforce/show"

S3_INPUT_BUCKET = f"smgt-workflows-{AWS_ACCOUNT_ID}-{AWS_REGION}-batch-input"

credentials = boto3.session.Session().get_credentials()

sess = requests.Session()

sess.auth = AWSV4Sign(credentials, AWS_REGION, "execute-api")
  • The IAM execution role you used to create this notebook instance must have the following permissions:

    • If you do not require granular permissions for your use case, you can attach AmazonSageMakerFullAccess to your IAM user or role. If you are running this example in a SageMaker notebook instance, this is the IAM execution role used to create your notebook instance. If you need granular permissions, see Assign IAM Permissions to Use Ground Truth for granular policy to use Ground Truth.

    • AWS managed policy AmazonSageMakerGroundTruthExecution. Run the following code-block to see your IAM execution role name. This GIF demonstrates how to attach this policy to an IAM role in the IAM console. You can also find instructions in the IAM User Guide: Adding and removing IAM identity permissions.

    • When you create your role, please make sure that you attach AmazonAPIGatewayInvokeFullAccess to your role.

    • When you create your role, you specify Amazon S3 permissions. Make sure that your IAM role has access to the S3 bucket that this solution uses in this example. If you do not require granular permissions, you can attach AmazonS3FullAccess to your role.

[ ]:
role = sm.get_execution_role()
role_name = role.split("/")[-1]
print(
    "IMPORTANT: Make sure this execution role has the AWS Managed policy AmazonGroundTruthExecution attached."
)
print("********************************************************************************")
print("The IAM execution role name:", role_name)
print("The IAM execution role ARN:", role)
print("********************************************************************************")

Download Data

We are going to use a dataset from the Multi Object Tracking Challenge, a commonly used benchmark for multi object tracking. We are going to download the data. Depending on your connection speed, this can take between 5 and 10 minutes. Then, we will unzip it and upload it to s3_input_bucket in Amazon S3.

Disclosure regarding the Multiple Object Tracking Benchmark:

Multiple Object Tracking Benchmark is created by Patrick Dendorfer, Hamid Rezatofighi, Anton Milan, Javen Shi, Daniel Cremers, Ian Reid, Stefan Roth, Konrad Schindler, and Laura Leal-Taixe. We have not modified the images or the accompanying annotations. You can obtain the images and the annotations here. The images and annotations are licensed by the authors under Creative Commons Attribution-NonCommercial-ShareAlike 3.0 License. The following paper describes Multiple Object Tracking Benchmark in depth: from the data collection and annotation to detailed statistics about the data and evaluation of models trained on it.

MOT17: A benchmark for multi object tracking in crowded scenes. Patrick Dendorfer, Hamid Rezatofighi, Anton Milan, Javen Shi, Daniel Cremers, Ian Reid, Stefan Roth, Konrad Schindler, Laura Leal-Taixe arXiv:2003.09003

[ ]:
# Download dataset
!wget https://motchallenge.net/data/MOT17.zip -O /tmp/MOT17.zip
[ ]:
# Unzip dataset
!unzip -q /tmp/MOT17.zip -d MOT17
!rm /tmp/MOT17.zip
[ ]:
!ls MOT17/MOT17/train

Copy Dataset to S3

[ ]:
# send our data to s3 this will take a couple minutes
!aws s3 sync MOT17/MOT17/train/MOT17-13-SDP s3://{S3_INPUT_BUCKET}/MOT17/train/MOT17-13-SDP

View Images and Labels

The scene is a street setting with a large number of cars and pedestrians. Grab image paths and plot the first image.

[ ]:
img_paths = glob("MOT17/MOT17/train/MOT17-13-SDP/img1/*.jpg")
img_paths.sort()

imgs = []
for imgp in img_paths:
    img = Image.open(imgp)
    imgs.append(img)

imgs

Generate manifests

SageMaker Ground Truth operates using manifests. When using a modality like image classification, a single image corresponds to a single entry in a manifest and a given manifest will directly contain paths for all of the images to be labeled. For videos, because we have multiple frames per video and we can have multiple videos in a single manifest, it is organized instead by using a JSON sequence file for each video that contains the paths to our frames in S3. This allows a single manifest to contain multiple videos for a single job.

In this case our image files are all split out, so we can just grab filepaths. If your data is in the form of video files, you can use the Ground Truth console to split videos into video frames. To learn more, see Automated Video Frame Input Data Setup. Other tools like ffmpeg can also be used for splitting video files into individual image frames. The below block is simply storing our filepaths in a dictionary.

[ ]:
vid = "MOT17/MOT17/train/MOT17-13-SDP"

# we assume we have folders with the same name as the mp4 file in the same root folder
files = glob(f"{vid}/img1/*jpg")
files.sort()
files = files[:300:2]  # skipping every other frame
fileset = []
for fil in files:
    fileset.append("/".join(fil.split("/")[5:]))

With your image paths, you can iterate through frames and create a list of entries for each in your sequence file.

[ ]:
# Generate sequences
os.makedirs("tracking_manifests", exist_ok=True)

frames = []
for i, v in enumerate(fileset):
    frame = {
        "frame-no": i + 1,
        "frame": f"{v.split('/')[-1]}",
    }
    frames.append(frame)
seq = {
    "version": "2020-07-01",
    "seq-no": 0,
    "prefix": f"s3://{S3_INPUT_BUCKET}/{'/'.join(vid.split('/')[1:])}/img1/",
    "number-of-frames": len(frames),
    "frames": frames,
}

print(seq)

# save sequences
with open(f"tracking_manifests/{vid.split('/')[-1]}_seq.json", "w") as f:
    json.dump(seq, f)

With your sequence file, you can create your manifest file. To create a new job with no existing labels, you can simply pass in a path to your sequence file. Since you already have labels and instead want to launch an adjustment job, point to the location of those labels in Amazon S3 and provide metadata for those labels in your manifest.

[ ]:
# create manifest
source_ref = f"s3://{S3_INPUT_BUCKET}/tracking_manifests/{vid.split('/')[-1]}_seq.json"
annot_labels = f"s3://{S3_INPUT_BUCKET}/tracking_manifests/SeqLabel.json"

manifest = {
    "source-ref": source_ref,
}

# save videos as individual jobs
manifest_file_name = f"tracking_manifests/{vid.split('/')[-1]}.manifest"
with open(manifest_file_name, "w") as f:
    json.dump(manifest, f)

manifest_s3_uri = f"s3://{S3_INPUT_BUCKET}/{manifest_file_name}"

print("Example manifest: ", manifest)

Generate Label Category Configuration Files

The following cell creates label category configuration files for the labeling jobs. These files are used to identify the label categories labelers use to annotate objects, and label category attributes which can be used to provide additional information about objects or scenes. For more information about these files, see Create a Labeling Category Configuration File with Label Category and Frame Attributes.

[ ]:
moving_attribute = {
    "name": "Moving",
    "type": "string",
    "enum": ["Stationary", "Dynamic"],
}
vehicle_type_attribute = {
    "name": "Vehicle_Type",
    "type": "string",
    "enum": ["Car", "Van", "Bus", "SUV"],
}
audit_attribute = {
    "name": "audit",
    "type": "string",
    "enum": ["Pass", "Fail"],
}

label_vehicle = {
    "document-version": "2020-08-15",
    "frameAttributes": [
        {
            "name": "Number_Of_Vehicles",
            "description": "How many vehicles to you see in the scene?",
            "type": "number",
        }
    ],
    "labels": [
        {
            "label": "Vehicle",
            "attributes": [moving_attribute, vehicle_type_attribute],
        },
    ],
    "instructions": {
        "shortInstruction": "Please label vehicles.",
        "fullInstruction": "Please label vehicles.",
    },
    "annotationType": "BoundingBox",
}
filename = "tracking_manifests/vehicle_label_category.json"
with open(filename, "w") as f:
    json.dump(label_vehicle, f)
vehicle_label_category_s3_uri = f"s3://{S3_INPUT_BUCKET}/{filename}"

label_vehicle_audit = {
    "document-version": "2020-08-15",
    "frameAttributes": [
        {
            "name": "Frame_Quality",
            "description": "Describe the quality of frame",
            "type": "string",
            "enum": ["Pass", "Fail"],
        },
    ],
    "labels": [
        {
            "label": "Vehicle",
            "attributes": [audit_attribute],
        },
    ],
    "instructions": {
        "shortInstruction": "Please label vehicles.",
        "fullInstruction": "Please label vehicles.",
    },
    "annotationType": "BoundingBox",
}
filename = "tracking_manifests/vehicle_label_category_audit.json"
with open(filename, "w") as f:
    json.dump(label_vehicle_audit, f)
vehicle_audit_label_category_s3_uri = f"s3://{S3_INPUT_BUCKET}/{filename}"

label_lane = {
    "document-version": "2020-08-15",
    "labels": [
        {
            "label": "Lane",
        },
    ],
    "instructions": {
        "shortInstruction": "Please label lanes.",
        "fullInstruction": "Please label lanes.",
    },
    "annotationType": "Polyline",
}
filename = "tracking_manifests/lane_label_category.json"
with open(filename, "w") as f:
    json.dump(label_lane, f)
lane_label_category_s3_uri = f"s3://{S3_INPUT_BUCKET}/{filename}"

label_lane_audit = {
    "document-version": "2020-08-15",
    "labels": [
        {
            "label": "Lane",
            "attributes": [audit_attribute],
        },
    ],
    "instructions": {
        "shortInstruction": "Please label lanes.",
        "fullInstruction": "Please label lanes.",
    },
    "annotationType": "Polyline",
}
filename = "tracking_manifests/lane_label_category_audit.json"
with open(filename, "w") as f:
    json.dump(label_lane_audit, f)
lane_audit_label_category_s3_uri = f"s3://{S3_INPUT_BUCKET}/{filename}"
[ ]:
# Send data to S3
!aws s3 cp --recursive tracking_manifests s3://{S3_INPUT_BUCKET}/tracking_manifests

Batch Creation Demo

Now we’ll send the input manifest files that we just created through a custom Ground Truth pipeline, orchestrated by batch jobs.

[ ]:
batch_id = f"nb-track-{str(uuid.uuid4())[:8]}"


# Perform image classification, bounding box, and polylines annotations.
batch_create_response = sess.post(
    API_URL_POST_BATCH_CREATE,
    json={
        "batchId": batch_id,
        "labelingJobs": [
            {
                "jobName": f"{batch_id}-vehicle",
                "jobType": "BATCH",
                "jobModality": "VideoObjectTracking",
                "labelAttributeName": "vehicle-ref",
                "labelCategoryConfigS3Uri": vehicle_label_category_s3_uri,
                "inputConfig": {
                    "inputManifestS3Uri": manifest_s3_uri,
                },
                "jobLevel": 1,
            },
            {
                "jobName": f"{batch_id}-lane",
                "jobType": "BATCH",
                "jobModality": "VideoObjectTracking",
                "labelAttributeName": "lane-ref",
                "labelCategoryConfigS3Uri": lane_label_category_s3_uri,
                "inputConfig": {
                    "inputManifestS3Uri": manifest_s3_uri,
                },
                "jobLevel": 1,
            },
            {
                "jobName": f"{batch_id}-vehicle-audit",
                "jobType": "BATCH",
                "jobModality": "VideoObjectTrackingAudit",
                "labelAttributeName": "vehicle-audit-ref",
                "labelCategoryConfigS3Uri": vehicle_audit_label_category_s3_uri,
                "inputConfig": {
                    "chainFromJobName": f"{batch_id}-vehicle",
                },
                "jobLevel": 2,
            },
            {
                "jobName": f"{batch_id}-lane-audit",
                "jobType": "BATCH",
                "jobModality": "VideoObjectTrackingAudit",
                "labelAttributeName": "lane-audit-ref",
                "labelCategoryConfigS3Uri": lane_audit_label_category_s3_uri,
                "inputConfig": {"chainFromJobName": f"{batch_id}-lane"},
                "jobLevel": 2,
            },
        ],
    },
)


print(batch_create_response.text)

Sign-in To Worker Portal

  • If you have added yourself to the work teams created by the Cloud Formation Template, use the following URL to login to the worker portal.

  • If you have not added yourself to the work team, use the instructions in Add or Remove Workers to add yourself to the following work teams: smgt-workflows-first-level and smgt-workflows-second-level .

[ ]:
sagemaker = boto3.client("sagemaker")
workteam = sagemaker.describe_workteam(WorkteamName="smgt-workflows-second-level")
worker_portal_url = workteam["Workteam"]["SubDomain"]
print(f"Sign-in by going here: {worker_portal_url}")

Complete Tasks In Worker Portal

  • Sign-in, wait for a task to appear in the worker portal

  • Complete the task by hitting submit in the bottom right corner

  • Ensure you wait long enough to see both frames appear in the worker console and submit both

  • Once you’re done a notification should come over the job output SNS indicating that the batch has completed first level review and is waiting for batch downsampling metadata before continuing on

Batch Show Demo

  • Now our batch is being processed and will show up in the worker portal within a few minutes

  • We can see a list of all the batches being processed using the batch/show API without arguments

[ ]:
batch_show_response = sess.get(API_URL_GET_BATCH_SHOW)
print(f"Your batch: {batch_id} should be present in the list below")
batch_show_response.json()["COMPLETE"]

Batch Detailed Show Demo

[ ]:
# Choose a batch of interest
batch_id = "add-batch-name"
[ ]:
# Now let's see details about the batch
batch_show_response = sess.get(API_URL_GET_BATCH_SHOW, params={"batchId": batch_id})
batch_show_response.json()

Workforce Show Demo

  • Make sure an email you have access to is in the Workforce

[ ]:
sess.get(API_URL_GET_WORKFORCE_SHOW).json()

Batch Metadata Post

  • Here we provide the downsample rate for the batch that we want to send to the QA Manager step, let’s specify a downsample rate of 50% to only send one frame to the manager queue.

[ ]:
batch_metadata_post = sess.post(
    API_URL_POST_BATCH_METADATA_POST,
    json={
        "downSamplingRate": 50,
        "batchId": batch_id,
    },
)
batch_metadata_post.text

Complete Second Level Reviewer Tasks In Worker Portal

  • After completing batch metadata post, the frames are downsampled and sent to the next level queue

[ ]:
# Now we can watch batch status until completion.
batch_show_response = sess.get(API_URL_GET_BATCH_SHOW, params={"batchId": batch_id})
batch_show_response.json()["status"]
[ ]:
# Once complete, let's grab relevant manifests.
batch_show_response = sess.get(API_URL_GET_BATCH_SHOW, params={"batchId": batch_id})
# first_level_output_manifest_url = batch_show_response.json()["firstLevel"]["outputS3PresignedUrl"]
second_level_output_manifest_url = batch_show_response.json()["secondLevel"]["jobLevels"][0][
    "jobOutputS3Url"
]
# print(f"First level output manifest url: {first_level_output_manifest_url}")
print(f"Second level output manifest url: {second_level_output_manifest_url}")

Conclusion

  • This notebook introduced how to take video frame data and trigger a workflow to run multiple Ground Truth labeling jobs, generating two different types of annotations (bounding boxes and polylines). You also learned how you can extend the pipeline to audit and verify the labeled dataset and how to retrieve the audited results. Lastly, you saw how to reference the current progress of batch jobs using the BatchShow API.

Customize the notebook for your input datasets by adding additional jobs or audit steps, or by modifying the data modality of the jobs. Further customization could include, but is not limited, to: * Adding additional types of annotations such as semantic segmentation masks or keypoints * Adding automated quality assurance and filtering to the Step Functions workflow to only send low-quality annotations to the next level of review * Adding third or fourth levels of quality review for additional, more specialized types of reviews

After completing the demo, you can clean up all the resources using this section

Clean Up

  • After completing the demo, you can clean up all the resources using this section

[ ]:
# Enter the top level cloudformation stack name i.e smgt-workflow-v1
cloud_formation_stack_name = ""
[ ]:
response = cfn_client.list_stack_resources(StackName=cloud_formation_stack_name)

resource_summaries = response["StackResourceSummaries"]

for resource_summary in resource_summaries:
    logical_resource_id = resource_summary["LogicalResourceId"]
    stack_name = resource_summary["PhysicalResourceId"].split("/")[1]
    nested_stack_resources = cfn_client.list_stack_resources(StackName=stack_name)[
        "StackResourceSummaries"
    ]

    for nested_stack_resource in nested_stack_resources:
        if nested_stack_resource["ResourceType"] == "AWS::S3::Bucket":
            s3_bucket_name = nested_stack_resource["PhysicalResourceId"]
            bucket = s3.Bucket(s3_bucket_name)
            for obj in bucket.objects.filter():
                s3.Object(bucket.name, obj.key).delete()


response = cfn_client.delete_stack(StackName=cloud_formation_stack_name)

print(
    "Cloudformation stack deletion triggered succesfully. You can monitor the deletion progress from the cloudformation console."
)