Annotation of Dense Point Clouds Using Amazon SageMaker Ground Truth

This notebook walks through how to annotate dense point clouds using SageMaker Ground Truth. We define dense point clouds as point clouds with a high number of points per scene (> 500k) and have enough points relative to the feature size of a scene that labelers can still make out objects in the scene (cars, pedestrians, etc) while only looking at a subset of the scene’s total points. When these conditions are met, you can often improve the labeling experience and throughput by reducing your scene’s point cloud size through downsampling techniques. After labeling, you can either directly use labels on the full-size scene for some modalities (like object detection bounding boxes), or you can “upsample” your labels by doing some in-sample prediction of the unlabeled points.

Recommended Kernel: conda_python3

Let’s start by importing required libraries and initializing session and other variables used in this notebook. By default, the notebook uses the default Amazon S3 bucket in the same AWS Region you use to run this notebook. If you want to use a different S3 bucket, make sure it is in the same AWS Region you use to complete this tutorial, and specify the bucket name for bucket. It is also recommended to use an instance size with at least 16 GB of RAM.

[ ]:
%pylab inline
import boto3
import json
import uuid
import zlib
from array import array

import sagemaker as sm
import scipy.stats
from sklearn.neighbors import KNeighborsClassifier
from mpl_toolkits.mplot3d import Axes3D


You will create some of the resources you need to launch a Ground Truth audit labeling job in this notebook. You must create the following resources before executing this notebook:

  • A work team. A work team is a group of workers that complete labeling tasks. If you want to preview the worker UI and execute the labeling task you will need to create a private work team, add yourself as a worker to this team, and provide the work team ARN below. This GIF demonstrates how to quickly create a private work team on the Amazon SageMaker console. To learn more about private, vendor, and Amazon Mechanical Turk workforces, see Create and Manage Workforces.

[ ]:

# Make sure workteam arn is populated if private work team is chosen
  • The IAM execution role you used to create this notebook instance must have the following permissions:

    • If you do not require granular permissions for your use case, you can attach AmazonSageMakerFullAccess to your IAM user or role. If you are running this example in a SageMaker notebook instance, this is the IAM execution role used to create your notebook instance.If you need granular permissions, please see Assign IAM Permissions to Use Ground Truth for granular policy to use Ground Truth.

    • AWS managed policy AmazonSageMakerGroundTruthExecution. Run the following code-block to see your IAM execution role name. This GIF demonstrates how to attach this policy to an IAM role in the IAM console. You can also find instructions in the IAM User Guide: Adding and removing IAM identity permissions.

    • When you create your role, you specify Amazon S3 permissions. Make sure that your IAM role has access to the S3 bucket that you plan to use in this example. If you do not specify an S3 bucket in this notebook, the default bucket in the AWS region you are running this notebook instance will be used. If you do not require granular permissions, you can attach AmazonS3FullAccess to your role.

[ ]:
role = sm.get_execution_role()
role_name = role.split("/")[-1]
    "IMPORTANT: Make sure this IAM role has one or more IAM policies with the permissions described above attached."
print("The IAM execution role name:", role_name)
print("The IAM execution role ARN:", role)

sagemaker_cl = boto3.client("sagemaker")
# Make sure the bucket is in the same region as this notebook.
bucket = "<< YOUR S3 BUCKET NAME >>"

sm_session = sm.Session()
s3 = boto3.client("s3")

if bucket == "<< YOUR S3 BUCKET NAME >>":
    bucket = sm_session.default_bucket()
region = boto3.session.Session().region_name
bucket_region = s3.head_bucket(Bucket=bucket)["ResponseMetadata"]["HTTPHeaders"][
assert (
    bucket_region == region
), f"Your S3 bucket {bucket} and this notebook need to be in the same region."
print(f"IMPORTANT: make sure the role {role_name} has the access to read and write to this bucket.")
print(f"This notebook will use the following S3 bucket: {bucket}")

    "If you don't have a CORS policy attached, you will be unable to label data in the task, use the following policy."
        "AllowedHeaders": [
        "AllowedMethods": [
        "AllowedOrigins": [
        "ExposeHeaders": [
        "MaxAgeSeconds": 3000

Section 1: Download Data

You can download from the following location. There is an object called that you will visualize, downsample, and then label. This data is a scan of an apartment building rooftop generated using the 3d Scanner App on an iPhone12 Pro. The app allows users to scan a given area and then export a pointcloud file. In this case the pointcloud data is in xyzrgb format, an accepted format for SageMaker Ground Truth point cloud, for more information on the data types allowed in SageMaker Ground Truth point cloud refer to the following documentation:

[ ]:
!wget -O
[ ]:
# Let's read our dataset into a numpy file, we'll call our pointclouds "pc" for short.
# Our point cloud consists of 3D euclidean coordinates of each point and RGB color values for each point.
# For example:
# p1_x, p1_y, p1_z, p1_r, p1_g, p1_b
# p2_x, p2_y, p2_z, p2_r, p2_g, p2_b
# p3_x, p3_y, p3_z, p3_r, p3_g, p3_b
# ...
pc = np.loadtxt("", delimiter=",")

print(f"Loaded points of shape {pc.shape}")

Visualize our pointcloud

You can visualize the point cloud using matplotlib’s scatter3d function. The pointcloud file contains all of the correct points but in this case is not rotated correctly. You can rotate the object around it’s axis by multiplying the pointcloud by a rotation matrix. You can obtain a rotation matrix using scipy and specifying the degree changes you would like to make to each axis using the from_euler method.

[ ]:
# playing with view of 3D scene

from scipy.spatial.transform import Rotation

def plot_pointcloud(
    pc, rot=[[0, 0, 0]], color=True, title="Simple Downsampling 1", figsize=(50, 25), verbose=False
    rot1 = Rotation.from_euler("zyx", rot, degrees=True)
    R1 = rot1.as_matrix()
    if verbose:
        print("Rotation matrix:", "\n", R1)

    # matrix multiplication between our rotation matrix and pointcloud
    pc_show = np.matmul(R1, pc.copy()[:, :3].transpose()).transpose()
    if color:
        rot_color = pc.copy()[:, 3:]

    fig = plt.figure(figsize=figsize)
    ax = fig.add_subplot(111, projection="3d")
    ax.set_title(title, fontdict={"fontsize": 20})
    if color:
        # need to create list of tuples for matplotlib to plot RGB color values
        color_tuple_list = []
        for i in range(pc_show.shape[0]):
            color_tuple = (
                int(np.abs(rot_color[i, 0])) / 255,
                int(np.abs(rot_color[i, 1])) / 255,
                int(np.abs(rot_color[i, 2])) / 255,
        ax.scatter(pc_show[:, 0], pc_show[:, 1], pc_show[:, 2], c=color_tuple_list, s=0.05)
        ax.scatter(pc_show[:, 0], pc_show[:, 1], pc_show[:, 2], c="blue", s=0.05)

# rotate in z direction 30 degrees, y direction 90 degrees, and x direction 60 degrees
# only run this once!
rot = Rotation.from_euler("zyx", [[30, 90, 60]], degrees=True)
R = rot.as_matrix()
print("Rotation matrix:", "\n", R)
pc_rot = np.matmul(R, pc[:, :3].transpose()).transpose().squeeze()
pc_color = pc.copy()[:, 3:]
pc = np.zeros_like(pc)
pc[:, :3] = pc_rot
pc[:, 3:] = pc_color
[ ]:
# view full pointcloud, will take ~10 seconds to render depending on the instance type you are using

plot_pointcloud(pc, rot=[[0, 0, 0]], color=True, title="Full Pointcloud", figsize=(50, 30))

Section 2: Downsample Approaches

Just like in image processing, there are many different approaches to performing downsampling in a point cloud. Certain approaches may preserve more information relevant for labelers, or may have different noise characteristics. You’ll walk through two approaches, a very basic linear sampling and a more advanced interpolation method.

Other alternatives include 3d box filtering, median filtering, and even random sampling.

I: Basic Approach

The simplest form of downsampling is to choose values using a fixed step size based on how large we want our resulting point cloud to be.

[ ]:
target_num_pts = 500_000
subsample = int(np.ceil(len(pc) / target_num_pts))
pc_downsample_simple = pc[::subsample]
print(f"We've subsampled to {len(pc_downsample_simple)} points")

II: Advanced Approach

A more advanced approach is to break the input space into cubes, otherwise known as voxels, and choose a single point per box using an averaging function.

In this example you will use cube sizes of 1.3 cm, which will partition the entire pointcloud scene into 1.3 cm by 1.3 cm cubes. Then for each cube, you will get a list of the points contained within the cube. Given the coordinates of this list of points, you take the centroid, or mean across each coordinate dimension to get a mean point that is representative of the overall set of points within the voxel. You run the operation twice, once over the xyz values then again over the color channel RGB values since you want to use the mins and maxes of the xyz values, but also preserve the color.

You will use scipy’s binned_statistic_dd to efficiently compute these representative points per voxel, then construct a new point cloud using these representative points and use it for downstream labeling.

[ ]:
boxsize = 0.013  # 1.3 cm box size.
mins = pc[:, :3].min(axis=0)
maxes = pc[:, :3].max(axis=0)
volume = maxes - mins
num_boxes_per_axis = np.ceil(volume / boxsize).astype("int32").tolist()

print("Number of boxes per axis", num_boxes_per_axis)

# For each voxel or "box", use the mean of the box to chose which points are in the box.
means, _, _ = scipy.stats.binned_statistic_dd(
    pc[:, :3],
    [pc[:, 0], pc[:, 1], pc[:, 2]],

# run a second operation over the color channels
means2, _, _ = scipy.stats.binned_statistic_dd(
    pc[:, :3],
    [pc[:, 3], pc[:, 4], pc[:, 5]],

x_means = means[0, ~np.isnan(means[0])].flatten()
y_means = means[1, ~np.isnan(means[1])].flatten()
z_means = means[2, ~np.isnan(means[2])].flatten()
r_means = means2[0, ~np.isnan(means2[0])].flatten()
b_means = means2[1, ~np.isnan(means2[1])].flatten()
g_means = means2[2, ~np.isnan(means2[2])].flatten()

pc_downsample_adv = np.column_stack([x_means, y_means, z_means, r_means, b_means, g_means])
print("downsampled pointcloud shape:", pc_downsample_adv.shape)

del x_means, y_means, z_means, r_means, b_means, g_means, means, num_boxes_per_axis

Section 3: Visualize 3D Rendering

You can visualize pointclouds using a 3D scatter plot of the points. You will see that the advanced voxel mean method creates a “smoother” point cloud as averaging has a noise reduction effect.

[ ]:
fig = plt.figure(figsize=(50, 30))
ax = fig.add_subplot(121, projection="3d")
ax.set_title("Simple Downsampling 1", fontdict={"fontsize": 20})
color_tuple_list = []
for i in range(pc_downsample_simple.shape[0]):
    color_tuple = (
        int(pc_downsample_simple[i, 3]) / 255,
        int(pc_downsample_simple[i, 4]) / 255,
        int(pc_downsample_simple[i, 5]) / 255,
    pc_downsample_simple[:, 0],
    pc_downsample_simple[:, 1],
    pc_downsample_simple[:, 2],

ax = fig.add_subplot(122, projection="3d")
ax.set_title("Voxel Mean Downsampling 1", fontdict={"fontsize": 20})
color_tuple_list = []
for i in range(pc_downsample_adv.shape[0]):
    color_tuple = (
        int(pc_downsample_adv[i, 3]) / 255,
        int(pc_downsample_adv[i, 4]) / 255,
        int(pc_downsample_adv[i, 5]) / 255,
    pc_downsample_adv[:, 0],
    pc_downsample_adv[:, 1],
    pc_downsample_adv[:, 2],

Below you can look at the point clouds from different perspectives by running the following function, which will multiply the pointclouds by the specified rotation matrices. The current setting will rotate the pointcloud 90 degrees to the right.

[ ]:
# try some different angles
    pc_downsample_adv, rot=[[90, 0, 0]], color=True, title="Advanced Downsampling", figsize=(50, 30)

Section 4: Amazon SageMaker Ground Truth (SMGT) Labeling Job

You can submit this downsampled point cloud as a SageMaker Ground Truth job by converting to SMGT labeling format, then creating a labeling job using this point cloud as input. You only save the points themselves since you are going to launch a semantic segmentation labeling task where you will label the points using distinct colors.

[ ]:
# Save in Ground Truth format
# See:

np.savetxt("pointcloud.txt", pc_downsample_adv[:, :3], delimiter=" ")
[ ]:
!aws s3 cp pointcloud.txt s3://{bucket}/input/pointcloud.txt

In the following cell, you generate an input manifest file. This file tells Ground Truth where 3D point cloud input data is located and contains metadata about your input data. To learn more, see Create an Input Manifest File for a 3D Point Cloud Labeling Job.

[ ]:
# Generate an input manifest.
input_manifest = json.dumps(
        "source-ref": f"s3://{bucket}/input/pointcloud.txt",
        "source-ref-metadata": {
            "format": "text/xyz",
            "unix-timestamp": 1566861644.759115,

input_manifest_name = "input.manifest"
with open(input_manifest_name, "w") as f:

input_manifest_s3_uri = f"s3://{bucket}/input/input.manifest"
!aws s3 cp {input_manifest_name} {input_manifest_s3_uri}

In the following cell, you generate a label category configuration file. This file contains the label categories workers use to label the 3D point cloud, worker instructions, and other information about your task. To learn more about all of the options you have when you create a label category configuration file, see Create a Labeling Category Configuration File with Label Category and Frame Attributes.

[ ]:
# Generate label category configuration.
label_category_config = json.dumps(
        "labels": [
            {"label": "Floor", "attributes": []},
            {"label": "Wall", "attributes": []},
            {"label": "Furniture", "attributes": []},
            {"label": "Plant", "attributes": []},
        "frameAttributes": [],
        "categoryGlobalAttributes": [],
        "instructions": {
            "shortInstruction": "Label",
            "fullInstruction": '<ul><li>After you’ve painted an object, use the <strong>Unpaint</strong> category to remove paint.&nbsp;</li><li>If you need to paint an object that is partially blocked by another object that you have painted, select the Paint cloud as background tool before you start painting.</li><li>Use the <strong>Shortcuts</strong> menu to see keyboard shortcuts that you can use to label objects faster.</li><li>Use the <strong>View</strong> menu to modify your view of the 3D point cloud and the worker portal.</li><li>Use the <strong>3D Point Cloud&nbsp;</strong>menu to modify the perspective of and pixel-attributes you see in the 3D point cloud.</li><li>Use this <a href="" rel="noopener noreferrer" target="_blank">resource</a> to learn about worker portal navigation, tools available to complete your task, icons, and view options.</li></ul>',
        "document-version": "2020-03-01",

label_category_config_filename = "labels.json"
with open(label_category_config_filename, "w") as f:

label_category_config_s3_uri = f"s3://{bucket}/input/labels.json"
!aws s3 cp {label_category_config_filename} {label_category_config_s3_uri}

In the following cell you configure the labeling job request. To learn more about the parameters used to configure a labeling job, see Create a Labeling Job (API).

[ ]:
hash_str = str(uuid.uuid4())[:4]
job_name = f"downsample-demo-{hash_str}"

lambda_account_id = {
    "us-east-1": "432418664414",
    "us-east-2": "266458841044",
    "us-west-2": "081040173940",
    "ca-central-1": "918755190332",
    "eu-west-1": "568282634449",
    "eu-west-2": "487402164563",
    "eu-central-1": "203001061592",
    "ap-northeast-1": "477331159723",
    "ap-northeast-2": "845288260483",
    "ap-south-1": "565803892007",
    "ap-southeast-1": "377565633583",
    "ap-southeast-2": "454466003867",

job_creation_request = {
    "LabelingJobName": job_name,
    "LabelAttributeName": f"{job_name}-ref",
    "InputConfig": {
        "DataSource": {
            "S3DataSource": {
                "ManifestS3Uri": input_manifest_s3_uri,
        "DataAttributes": {"ContentClassifiers": []},
    "OutputConfig": {
        "S3OutputPath": f"s3://{bucket}/output",
        "KmsKeyId": "",
    "RoleArn": role,
    "LabelCategoryConfigS3Uri": label_category_config_s3_uri,
    "StoppingConditions": {"MaxPercentageOfInputDatasetLabeled": 100},
    "HumanTaskConfig": {
        "WorkteamArn": WORKTEAM_ARN,
        "UiConfig": {
            "HumanTaskUiArn": f"arn:aws:sagemaker:{region}:394669845002:human-task-ui/PointCloudSemanticSegmentation"
        "PreHumanTaskLambdaArn": f"arn:aws:lambda:{region}:{lambda_account_id}:function:PRE-3DPointCloudSemanticSegmentation",
        "TaskKeywords": ["Point cloud", "segmentation"],
        "TaskTitle": job_name,
        "TaskDescription": "Create a semantic segmentation mask by painting objects in a 3D point cloud",
        "NumberOfHumanWorkersPerDataObject": 1,
        "TaskTimeLimitInSeconds": 604800,
        "TaskAvailabilityLifetimeInSeconds": 864000,
        "MaxConcurrentTaskCount": 1000,
        "AnnotationConsolidationConfig": {
            "AnnotationConsolidationLambdaArn": f"arn:aws:lambda:{region}:{lambda_account_id}:function:ACS-3DPointCloudSemanticSegmentation"

# Review the request for any errors before submitting.
print(json.dumps(job_creation_request, indent=2))
[ ]:

Label Data in the Worker Portal

Run the following cell to see the worker portal URL of your private workforce. Use this URL to login to the worker portal to see your labeling tasks. See the 3D point cloud worker instructions to learn more about the feature you’ll see in the worker UI.

[ ]:
workforce = sagemaker_cl.describe_workforce(WorkforceName="default")
worker_portal_url = workforce["Workforce"]["SubDomain"]
print(f"Sign-in by going here: {worker_portal_url}")

Label Data in the Worker Portal

After submitting the job in the worker portal, wait until the labeling job has status “complete”

[ ]:
job = sagemaker_cl.describe_labeling_job(LabelingJobName=job_name)
status = job["LabelingJobStatus"]
print(f"Status: '{status}'")
assert status == "Completed"

label_attribute_name = job["LabelAttributeName"]

output_manifest_s3_uri = job["LabelingJobOutput"]["OutputDatasetS3Uri"]
!aws s3 cp {output_manifest_s3_uri} output.manifest

with open("output.manifest") as f:
    manifest_line =[0]

data_object = json.loads(manifest_line)
labels_s3_uri = data_object[label_attribute_name]

!aws s3 cp {labels_s3_uri} pointcloud.zlib

Load Annotations From S3

[ ]:
def semantic_segmentation_to_classes(filename):
    """Convert from class map labels to numpy array."""
    with open(filename, "rb") as f:
        binary_content = zlib.decompress(
    my_classes = array("B", binary_content)
    return np.array(my_classes)

annotations = semantic_segmentation_to_classes("pointcloud.zlib")

# Verify we have a label for each point in the downsampled point cloud.
assert len(annotations) == len(pc_downsample_adv)

Section 5: KNN Upsample Approach

Now that we have the downsampled labels, we will train a K-NN classifier from SKLearn to predict the full dataset labels by treating our annotated points as training data and performing inference on the remainder of the unlabeled points in our full size point cloud. Note that you can tune the number of points used as well as the distance metric and weighting scheme to influence how label inference is performed. If you label a few tiles in the full-size dataset, you can use those labeled tiles as ground truth to evaluate the accuracy of the K-NN predictions. This accuracy metric can then be used for hyperparameter tuning of K-NN or to try different inference algorithms to reduce your number of misclassified points between object boundaries, resulting in the lowest possible in-sample error rate.

[ ]:
# There's a lot of possibility to tune KNN further
# 1) Prevent classification of points far away from all other points (random unfiltered ground point)
# 2) Perform a non-uniform weighted vote
# 3) Tweak number of neighbors
knn = KNeighborsClassifier(n_neighbors=3)
print(f"Training on {len(pc_downsample_adv)} labeled points")[:, :3], annotations)

print(f"Upsampled to {len(pc)} labeled points")
annotations_full = knn.predict(pc[:, :3])

Section 6: Visualize 3D Rendering

Now that you’ve performed upsampling of the labeled data, you can visualize a tile of the original full size point cloud.

[ ]:
pc_downsample_annotated = np.column_stack((pc_downsample_adv[:, :3], annotations))
pc_annotated = np.column_stack((pc[:, :3], annotations_full))

labeled_area = pc_downsample_annotated[pc_downsample_annotated[:, 3] != 255]
min_bounds = np.min(labeled_area, axis=0)
max_bounds = np.max(labeled_area, axis=0)

min_bounds = [-2, -2, -5.5, -1]
max_bounds = [2, 2, 0, 256]

def extract_tile(point_cloud, min_bounds, max_bounds):
    return point_cloud[
        (point_cloud[:, 0] > min_bounds[0])
        & (point_cloud[:, 1] > min_bounds[1])
        & (point_cloud[:, 2] > min_bounds[2])
        & (point_cloud[:, 0] < max_bounds[0])
        & (point_cloud[:, 1] < max_bounds[1])
        & (point_cloud[:, 2] < max_bounds[2])

tile_downsample_annotated = extract_tile(pc_downsample_annotated, min_bounds, max_bounds)
tile_annotated = extract_tile(pc_annotated, min_bounds, max_bounds)
[ ]:
down_rot = tile_downsample_annotated.copy()
down_rot_color = tile_downsample_annotated.copy()[:, 3:].astype("int32")
# change the color scheme to clearly display classes
down_rot_color[down_rot_color == 0] = 50
down_rot_color[down_rot_color == 1] = 100
down_rot_color[down_rot_color == 2] = 150
down_rot_color[down_rot_color == 3] = 200

full_rot = tile_annotated.copy()
full_rot_color = tile_annotated.copy()[:, 3:].astype("int32")
full_rot_color[full_rot_color == 0] = 50
full_rot_color[full_rot_color == 1] = 100
full_rot_color[full_rot_color == 2] = 150
full_rot_color[full_rot_color == 3] = 200

fig = plt.figure(figsize=(50, 20))
ax = fig.add_subplot(121, projection="3d")
ax.set_title("Downsampled Annotations", fontdict={"fontsize": 25})
ax.scatter(down_rot[:, 0], down_rot[:, 1], down_rot[:, 2], c=down_rot_color[:, 0], s=0.05)
ax = fig.add_subplot(122, projection="3d")
ax.set_title("Upsampled Annotations", fontdict={"fontsize": 25})
ax.scatter(full_rot[:, 0], full_rot[:, 1], full_rot[:, 2], c=full_rot_color[:, 0], s=0.05)


This notebook demonstrated a couple approaches to performing downsampling on pointclouds for data labeling use cases. These approaches can also be applied as preprocessing steps before training and inference. The notebook demonstrated a simple step based approach as well as a a more complex approach where you broke the pointcloud bounds into voxels and replaced each voxel with a summary point created by taking the mean of all points in the voxel.

You then learned how to convert these pointclouds into a SageMaker Ground Truth Semantic Segmentation job. Finally you learned how to take the resulting labels and apply them to the full-size point cloud using some in-sample prediction based on the K Nearest Neighbors algorithm.

Using these approaches, you can save time and effort labeling high-density pointclouds by avoiding tiling and instead labeling downsampled pointclouds.


If for some reason you didn’t complete the labeling job, you can run the following command to stop it.

[ ]: