Building Pipelines with Amazon SageMaker Geospatial Capabilities


This notebook’s CI test result for us-west-2 is as follows. CI test results in other regions can be found at the end of the notebook.

This us-west-2 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable


In this notebook, we will describe an example of how to build pipelines for automating the processing of geospatial data, using Amazon SageMaker geospatial capabilities and Amazon SageMaker Pipelines.

3211dc0521e54d22839c49f63d6b221e


We will start by making sure the “sagemaker” SDK is updated, and importing a few libraries required.

[ ]:
!pip install sagemaker --upgrade
[ ]:
import boto3
import sagemaker
import json
from datetime import datetime

We will now define a few variables for which we need to create sessions in the SageMaker and Boto3 SDKs.

We will also create the client for SageMaker geospatial capabilities with a Boto session…

[ ]:
sagemaker_session = sagemaker.Session()
bucket = sagemaker_session.default_bucket()  ### Replace with your own bucket if needed
role = sagemaker.get_execution_role(sagemaker_session)
sess = boto3.Session()
region = sess.region_name
prefix = "sm-geospatial-e2e"  ### Replace with the S3 prefix desired
print(f"S3 bucket: {bucket}")
print(f"Role: {role}")
print(f"Region: {region}")

Before we continue make sure that your AWS IAM role has the proper permissions for interacting with AWS Lambda, Amazon S3, and Amazon SQS as required.

Also, make sure you have the proper policy and trust relationship added to your role for “sagemaker-geospatial”, as specified in the Get Started with Amazon SageMaker Geospatial Capabiltiies documentation.

For demo purposes, you can uncomment and run the following cell to add the policies and trust relationships to your role using the script below, but this policy should be scoped down to improve security for any production deployment, following the least privilege principle.

[ ]:
#!python './code/adjust_role.py' {role}

Let’s now create an AWS Lambda function that will work for calling the SageMaker geospatial processes as required. We will use the same function for: * Starting the EOJs (Cloud Removal and Stacking in our example) - Using a Lambda Step in SageMaker Pipelines * Checking the status of the EOJs, as these are asynchronous and take a few minutes to complete - Using a Callback Step in SageMaker Pipelines

We will start by writting a script with our code.

[ ]:
%%writefile ./code/eoj_lambda.py
"""Script for calling SageMaker geospatial APIs as required"""
import json
import logging
import os
import boto3
import botocore
import ast
import time

logger = logging.getLogger()
logger.setLevel(os.getenv("LOGGING_LEVEL", logging.INFO))


def lambda_handler(event, context):
    """
    Manages SageMaker geospatial EOJs as required.
    """
    try:
        # Setup client...
        if "region" in event:
            region = event["region"]
        else:
            region = "us-west-2"
        gsClient = boto3.client("sagemaker-geospatial", region_name="us-west-2")

        if "eoj_name" in event:
            # Create a new EOJ...
            logger.debug(f"Will create EOJ with event:\n{json.dumps(event)}")
            if "RasterDataCollectionQuery" in str(event["eoj_input_config"]):
                # Input is a Raster Data Collection Query
                input_config = ast.literal_eval(event["eoj_input_config"])
            elif "arn" in str(event["eoj_input_config"]):
                # Input is chaining results of another EOJ
                input_config = {"PreviousEarthObservationJobArn": event["eoj_input_config"]}
            logger.info(f'Starting EOJ {event["eoj_name"]}')
            response = gsClient.start_earth_observation_job(
                Name=event["eoj_name"],
                ExecutionRoleArn=event["role"],
                InputConfig=input_config,
                JobConfig=ast.literal_eval(event["eoj_config"]),
            )
            logger.info(f'Create eoj_arn: {response["Arn"]}\n')
            time.sleep(3)

        elif "eoj_output_config" in event:
            # Export an EOJ...
            logger.debug(f"Will export EOJ with event:\n{json.dumps(event)}")
            logger.info(f'Exporting EOJ with Arn {event["eoj_arn"]}')
            response = gsClient.export_earth_observation_job(
                Arn=event["eoj_arn"],
                ExecutionRoleArn=event["role"],
                OutputConfig=ast.literal_eval(event["eoj_output_config"]),
            )
            logger.info(f'Export eoj_arn: {response["Arn"]}\n')

        elif "Records" in event:
            # Check status of previous EOJ...
            logger.debug(f"Will check status of EOJ with event:\n{json.dumps(event)}")
            for record in event["Records"]:
                payload = json.loads(record["body"])
                token = payload["token"]
                eoj_arn = payload["arguments"]["eoj_arn"]
                logger.info(f"Check EOJ or export with ARN: {eoj_arn}")
                response = gsClient.get_earth_observation_job(Arn=eoj_arn)
                if response["Status"] == "COMPLETED":
                    # EOJ is COMPLETED
                    logger.info("EOJ completed, resuming pipeline...")
                    sagemaker = boto3.client("sagemaker", region_name=region)
                    sagemaker.send_pipeline_execution_step_success(
                        CallbackToken=token,
                        OutputParameters=[{"Name": "eoj_status", "Value": response["Status"]}],
                    )
                elif response["Status"] == "SUCCEEDED":
                    # Export of EOJ SUCCEEDED
                    logger.info("Export EOJ succeeded, resuming pipeline...")
                    sagemaker = boto3.client("sagemaker", region_name=region)
                    sagemaker.send_pipeline_execution_step_sucess(
                        CallbackToken=token,
                        OutputParameters=[
                            {"Name": "export_eoj_status", "Value": response["Status"]}
                        ],
                    )
                elif response["Status"] == "FAILED":
                    logger.info("EOJ or export failed, stopping pipeline...")
                    sagemaker = boto3.client("sagemaker", region_name=region)
                    sagemaker.send_pipeline_execution_step_failure(
                        CallbackToken=token, FailureReason=response["ErrorDetails"]
                    )
                else:
                    # EOJ is still running IN_PROGRESS, we must check again later
                    # Note we must raise an exception for having the message put back to the SNS queue
                    logger.info(f'EOJ or export with status: {response["Status"]}')
                    raise Exception("EOJ or export still running...")
    except botocore.exceptions.ClientError as e:
        error_msg = f"EOJ or export call failed: {e.response['Error']['Code']}, {e.response['Error']['Message']}"
        raise Exception(error_msg)

    try:
        response
    except NameError:
        response = None

    if response is not None:
        logger.info(f'eoj_arn: {response["Arn"]}\n')
    else:
        response = {}
        response["Arn"] = ""

    return {"statusCode": 200, "eoj_arn": response["Arn"]}

We can now use this script for creating our Lambda function.

[ ]:
# Create the Lambda function...
import zipfile

with zipfile.ZipFile("./code/eoj_lambda.zip", "w") as zf:
    zf.write("./code/eoj_lambda.py", "eoj_lambda.py")

with open("./code/eoj_lambda.zip", "rb") as f:
    zipped_code = f.read()

lambda_client = boto3.client("lambda", region_name=region)

response = lambda_client.create_function(
    FunctionName="geospatial-lambda",
    Runtime="python3.9",
    Role=role,
    Handler="eoj_lambda.lambda_handler",
    Code=dict(ZipFile=zipped_code),
    Timeout=60,
    # Set up Lambda function environment variables
    Environment={
        "Variables": {"Name": "geospatial-lambda", "Environment": "prod", "LOGGING_LEVEL": "INFO"}
    },
)

function_arn = response["FunctionArn"]
print(f"Created Lambda function with ARN: {function_arn}")

Now that we have a Lambda function for calling our EOJs, we can implement the Callback steps required in our pipeline.

Note again, we are using SageMaker Pipelines Callback Steps because our EOJs are asynchronous and takes sometime to complete. So we want the Lambda to check the status of the EOJs, and resume the workflows when each EOJ is completed.

For this, we will create an Amazon SQS queue that will be used in our callback.

[ ]:
# Create SQS queue for handling async processes and callback
sqs_client = boto3.client("sqs", region_name=region)

sqs_client.create_queue(
    QueueName=f"geospatial-queue",
    Attributes={
        "VisibilityTimeout": "300",
        "DelaySeconds": "5",
        "ReceiveMessageWaitTimeSeconds": "5",
    },
)
queue_url = sqs_client.get_queue_url(QueueName=f"geospatial-queue")["QueueUrl"]
queue_arn = sqs_client.get_queue_attributes(QueueUrl=queue_url, AttributeNames=["QueueArn"])[
    "Attributes"
]["QueueArn"]

print(f"Created queue:\n{queue_url}\n{queue_arn}\n")

Now we need to associate the SQS queue as an input trigger for our Lambda function, in this way whenever the Callback Step pushes a message to the queue it would run our Lambda function for checking the status of the EOJ. We do this by creating an Even Source Mapping.

[ ]:
# Link SQS queue with the geospatial-check-lambda function...
event_source_mapping = lambda_client.create_event_source_mapping(
    EventSourceArn=queue_arn, FunctionName="geospatial-lambda", Enabled=True
)
print(f'Mapping Lambda function and SQS queue through UUID: {event_source_mapping["UUID"]}')

We will now define the parameters to be used in our pipeline, as we want to be able to pass these dynamically whenever we run our geospatial pipeline.

[ ]:
from sagemaker.workflow.parameters import ParameterString

parameter_role = ParameterString(name="parameter_role", default_value=role)
parameter_region = ParameterString(name="parameter_region", default_value=region)
parameter_queue_url = ParameterString(name="parameter_queue_url", default_value=queue_url)
parameter_eoj_input_config = ParameterString(name="parameter_eoj_input_config", default_value="")
parameter_cr_eoj_config = ParameterString(name="parameter_cr_eoj_config", default_value="")
parameter_s_eoj_config = ParameterString(name="parameter_s_eoj_config", default_value="")
parameter_eoj_output_config = ParameterString(name="parameter_eoj_output_config", default_value="")

With all the elements in place, we can now start creating our steps with SageMaker Pipelines…

[ ]:
from sagemaker.workflow.lambda_step import LambdaStep, LambdaOutput, LambdaOutputTypeEnum
from sagemaker.lambda_helper import Lambda

step_lambda_cr = LambdaStep(
    name="CloudRemovalStep",
    lambda_func=Lambda(function_arn=function_arn),
    inputs={
        "role": parameter_role,
        "region": parameter_region,
        "eoj_input_config": parameter_eoj_input_config,
        "eoj_config": parameter_cr_eoj_config,
        "eoj_name": f'cloudremoval-{datetime.now().strftime("%Y-%m-%d-%H-%M")}',
    },
    outputs=[
        LambdaOutput(output_name="statusCode", output_type=LambdaOutputTypeEnum.String),
        LambdaOutput(output_name="eoj_arn", output_type=LambdaOutputTypeEnum.String),
    ],
)
[ ]:
from sagemaker.workflow.callback_step import CallbackStep, CallbackOutput, CallbackOutputTypeEnum

step_callback_cr = CallbackStep(
    name="CloudRemovalCallbackStep",
    depends_on=["CloudRemovalStep"],
    sqs_queue_url=parameter_queue_url,
    inputs={
        "role": parameter_role,
        "region": parameter_region,
        "eoj_arn": step_lambda_cr.properties.Outputs["eoj_arn"],
    },
    outputs=[
        CallbackOutput(output_name="eoj_status", output_type=CallbackOutputTypeEnum.String),
    ],
)
[ ]:
step_lambda_s = LambdaStep(
    name="StackingStep",
    depends_on=["CloudRemovalCallbackStep"],
    lambda_func=Lambda(function_arn=function_arn),
    inputs={
        "role": parameter_role,
        "region": parameter_region,
        "eoj_input_config": step_lambda_cr.properties.Outputs["eoj_arn"],
        "eoj_config": parameter_s_eoj_config,
        "eoj_name": f'stacking-{datetime.now().strftime("%Y-%m-%d-%H-%M")}',
    },
    outputs=[
        LambdaOutput(output_name="statusCode", output_type=LambdaOutputTypeEnum.String),
        LambdaOutput(output_name="eoj_arn", output_type=LambdaOutputTypeEnum.String),
    ],
)
[ ]:
step_callback_s = CallbackStep(
    name="StackingCallbackStep",
    depends_on=["StackingStep"],
    sqs_queue_url=parameter_queue_url,
    inputs={
        "role": parameter_role,
        "region": parameter_region,
        "eoj_arn": step_lambda_s.properties.Outputs["eoj_arn"],
    },
    outputs=[
        CallbackOutput(output_name="statusJob", output_type=CallbackOutputTypeEnum.String),
    ],
)
[ ]:
step_lambda_ex = LambdaStep(
    name="ExportStep",
    depends_on=["StackingCallbackStep"],
    lambda_func=Lambda(function_arn=function_arn),
    inputs={
        "eoj_arn": step_lambda_s.properties.Outputs["eoj_arn"],
        "role": parameter_role,
        "region": parameter_region,
        "eoj_output_config": parameter_eoj_output_config,
    },
    outputs=[
        LambdaOutput(output_name="statusCode", output_type=LambdaOutputTypeEnum.String),
        LambdaOutput(output_name="eoj_arn", output_type=LambdaOutputTypeEnum.String),
    ],
)
[ ]:
step_callback_ex = CallbackStep(
    name="ExportCallbackStep",
    depends_on=["ExportStep"],
    sqs_queue_url=parameter_queue_url,
    inputs={
        "role": parameter_role,
        "region": parameter_region,
        "eoj_arn": step_lambda_ex.properties.Outputs["eoj_arn"],
    },
    outputs=[
        CallbackOutput(output_name="statusJob", output_type=CallbackOutputTypeEnum.String),
    ],
)

Finally, we can define our pipeline based on the steps and parameters created before.

[ ]:
from sagemaker.workflow.pipeline import Pipeline

pipeline_name = "GeospatialPipeline"

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        parameter_role,
        parameter_region,
        parameter_queue_url,
        parameter_eoj_input_config,
        parameter_cr_eoj_config,
        parameter_s_eoj_config,
        parameter_eoj_output_config,
    ],
    steps=[
        step_lambda_cr,
        step_callback_cr,
        step_lambda_s,
        step_callback_s,
        step_lambda_ex,
        step_callback_ex,
    ],
)
[ ]:
definition = json.loads(pipeline.definition())
definition
[ ]:
pipeline.upsert(role_arn=role)

Let us test our pipeline by defining some values for our parameters and running an execution.

[ ]:
# Replace with the data collection of interest...
data_collection_arn = "arn:aws:sagemaker-geospatial:us-west-2:378778860802:raster-data-collection/public/nmqj48dcu3g7ayw8"

# Replace with the coordinates of interest...
coordinates = [
    [9.181602157004177, 53.14038825707946],
    [9.181602157004177, 52.30629767547948],
    [10.587520893823973, 52.30629767547948],
    [10.587520893823973, 53.14038825707946],
    [9.181602157004177, 53.14038825707946],
]
# Replace with the time-range of interest...
time_start = "2022-03-05T12:00:00Z"
time_end = "2022-03-15T12:00:00Z"

eoj_input_config = {
    "RasterDataCollectionQuery": {
        "RasterDataCollectionArn": data_collection_arn,
        "AreaOfInterest": {
            "AreaOfInterestGeometry": {"PolygonGeometry": {"Coordinates": [coordinates]}}
        },
        "TimeRangeFilter": {"StartTime": time_start, "EndTime": time_end},
        "PropertyFilters": {
            "Properties": [{"Property": {"EoCloudCover": {"LowerBound": 0, "UpperBound": 2}}}]
        },
    }
}

cr_eoj_config = {
    "CloudRemovalConfig": {"AlgorithmName": "INTERPOLATION", "InterpolationValue": "-9999"}
}

s_eoj_config = {
    "StackConfig": {
        "OutputResolution": {"Predefined": "HIGHEST"},
        "TargetBands": ["red", "green", "blue"],
    }
}

eoj_output_config = {"S3Data": {"S3Uri": f"s3://{bucket}/{prefix}/export/", "KmsKeyId": ""}}

execution = pipeline.start(
    parameters=dict(
        parameter_role=role,
        parameter_region=region,
        parameter_queue_url=queue_url,
        parameter_eoj_input_config=eoj_input_config,
        parameter_cr_eoj_config=cr_eoj_config,
        parameter_s_eoj_config=s_eoj_config,
        parameter_eoj_output_config=eoj_output_config,
    )
)

At this point, you can go to the SageMaker Resources tab in the left menu in Studio and check the Pipelines.

You should be able to see our “GeospatialPipeline” in the list and double-cling on it for checking the details of the execution.


Clean-up

Once done, uncomment and run the following cells for deleting any resources that could incur in costs.

[ ]:
# Delete the SQS queue
# sqs_client = boto3.client("sqs", region_name=region)
# sqs_client.delete_queue(
#    QueueUrl=sqs_client.get_queue_url(QueueName=f"geospatial-queue")["QueueUrl"]
# )
[ ]:
# Delete Lambda function
# lambda_client = boto3.client("lambda", region_name=region)
# lambda_client.delete_function(FunctionName="geospatial-lambda")
[ ]:
# Delete the SageMaker Pipeline
# sagemaker_client = boto3.client("sagemaker", region_name=region)
# sagemaker_client.delete_pipeline(PipelineName="GeospatialPipeline")

Notebook CI Test Results

This notebook was tested in multiple regions. The test results are as follows, except for us-west-2 which is shown at the top of the notebook.

This us-east-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This us-east-2 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This us-west-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This ca-central-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This sa-east-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This eu-west-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This eu-west-2 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This eu-west-3 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This eu-central-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This eu-north-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This ap-southeast-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This ap-southeast-2 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This ap-northeast-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This ap-northeast-2 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This ap-south-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable