Quick Start - Introducing @step Decorator and Pipeline Trigger


This notebook’s CI test result for us-west-2 is as follows. CI test results in other regions can be found at the end of the notebook.

This us-west-2 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable


We’re introducing a low-code experience for data scientists to convert the Machine Learning (ML) development code into repeatable and reusable workflow steps of Amazon SageMaker Pipelines.

This sample notebook is a quick introduction to this capability with dummy Python functions wrapped as pipeline steps. The pipeline in this notebook generates random integer numbers and performs a statistic operation (avg or sum) to them.

Note this notebook can only run on either Python 3.8 or Python 3.10. Otherwise, you will get an error message prompting you to provide an image_uri when defining a step.

Install the dependencies

If you run the notebook from a local IDE outside of SageMaker, please follow the “AWS CLI Prerequisites” section of the Set Up Amazon SageMaker Prerequisites to set up AWS credentials.

Next, run the cell bellow to install all dependencies required by this notebook.

[ ]:
!pip install -r ./requirements.txt

Setup configuration file path

We need to set the directory in which the config.yaml file resides so that the step decorator can make use of the settings.

[ ]:
import os

# Set path to config file
os.environ["SAGEMAKER_USER_CONFIG_OVERRIDE"] = os.getcwd()

Define pipeline steps

We can define a pipeline step by simply adding a step decorator on top of a custom function (i.e. the generate function below).

By invoking the generate function, it would not execute the function at this moment. Rather, it delays the function execution to pipeline execution time when the step is running. Thus, the function would return a DelayedReturn object, which can represent this step and can be used as input of subsequent steps.

[ ]:
import random
from sagemaker.workflow.function_step import step


# The step name defaults to be the function name (i.e. "generate")
# appended with a UUID to make it unique
@step(keep_alive_period_in_seconds=300)
def generate():
    random_number = random.randint(0, 10)
    print(f"Generated random number: {random_number}")
    return random_number
[ ]:
reduce_func_step_name = "reduce"


# Override the step name.
# Users have to ensure custom step name uniqueness
@step(name=reduce_func_step_name, keep_alive_period_in_seconds=300)
def my_reduce(stat, number_1, number_2):
    if stat == "avg":
        return sum([number_1, number_2]) / 2.0
    elif stat == "sum":
        return sum([number_1, number_2])
    else:
        raise ValueError

Add extra dependencies using conda environment yml file

To run a function in a new conda environment, you can specify the path of an environment.yml file to the dependencies attribute as follows:

@step(
    dependencies="./environment.yml",
)
def my-multiply(dataframe: pd.DataFrame, factor: float):
    result = dataframe * factor
    print(f"multiply result: {result}")
    return result

Note: A sample environment.yml file has been provided along with this notebook.

Define a pipeline with steps

After defining all the steps, we can group them into a pipeline.

Notes: 1. There’s no need to put all the step delayed return objects into the pipeline’s steps list. As we’ve defined the step dependencies via function dependencies, we only need to put the end step (i.e. the DelayedReturn object returned by my_reduce) into the list and the pipeline object can automatically retrieve all its upstream steps. 2. The my_reduce function not only takes in the DelayedReturn object from the previous step, but consumes a Parameter object as well, which is used to determine the statistic operation needed to perform.

[ ]:
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.parameters import ParameterString

param = ParameterString(name="stat", default_value="avg")

pipeline_name = "CalculatorPipeline"
pipeline = Pipeline(
    name=pipeline_name,
    steps=[my_reduce(param, generate(), generate())],
    parameters=[param],
)

Create the pipeline and run pipeline execution

[ ]:
import sagemaker

# Note: sagemaker.get_execution_role does not work outside sagemaker
role = sagemaker.get_execution_role()
pipeline.upsert(role_arn=role)
[ ]:
execution = pipeline.start(parallelism_config=dict(MaxParallelExecutionSteps=10))
[ ]:
execution.describe()
[ ]:
execution.wait()
[ ]:
execution.list_steps()

Once the pipeline execution completes, we can retrieve the output of a step as follows. Note: only the output of a @step decorated function can be retrieved via execution.result(). In other words, this does not work for the classic step types, e.g. ProcessingStep, TrainingStep etc.

[ ]:
execution.result(step_name=reduce_func_step_name)

Parametrized Executions

In the cell below, we change the “stat” Parameter’s runtime value from the default “avg” to “sum” and re-run the pipeline execution.

[ ]:
execution_sum = pipeline.start(
    parameters=dict(
        stat="sum",
    ),
    parallelism_config=dict(MaxParallelExecutionSteps=10),
)
[ ]:
execution_sum.wait()
[ ]:
execution_sum.list_steps()
[ ]:
execution_sum.result(step_name=reduce_func_step_name)

Schedule Pipeline Executions

The following cells shows how to set up Pipeline Triggers to interact seamlessly with EventBridge Schedules. A PipelineSchedule automatically starts a pipeline execution one-time or on an interval based on the type of schedule expression used.

To ensure that the pipeline scheduler works properly, please ensure that the following permissions are attached to current execution role.

  1. Attach adequate permissions (e.g. AmazonEventBridgeSchedulerFullAccess) to the role.

  2. Establish trust relationship with EventBridge via adding the service principal scheduler.amazonaws.com to the role’s trust policy:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "Service": [
                    "scheduler.amazonaws.com",
                    "sagemaker.amazonaws.com",
                    ...
                ]
            },
            "Action": "sts:AssumeRole"
        }
    ]
}

The cell below provides a simple sample which demonstrates how to create a rate-based pipeline schedule via configuring the rate argument. It starts a pipeline execution immediately and reoccurs every 5 minutes.

[ ]:
from sagemaker.workflow.triggers import PipelineSchedule

schedule = PipelineSchedule(name="calculator-rate-schedule", rate=(5, "minutes"))

Assigning triggers to a parent pipeline

A role must be provided to the pipeline scheduler with sufficient permissions to create and use pipeline triggers. This can either be passed in explicitly, as shown below, or be fetched from the intelligent defaults defined in the config.yaml file.

[ ]:
schedules = [schedule]
pipeline.put_triggers(triggers=schedules, role_arn=role)
[ ]:
pipeline.describe_trigger(trigger_name=schedule.name)

Clean up resources

Before deleting the pipeline, let’s make sure deleting its triggers as well. The triggers can be deleted by specifying a list of trigger names.

Notes on clean up limitations: * This feature to delete the triggers by specifying trigger names is only available in SageMaker Python SDK. If you delete the pipeline via CLI or DeletePipeline API call, triggers are not deleted. This can cause the trigger to become orphaned, and it will attempt to start an execution for a non-existent pipeline. * If you have already deleted the pipeline target without cleaning up its associated triggers (i.e. schedules), be sure to clean up orphaned schedules via the scheduler CLI or EventBridge console.

[ ]:
pipeline.delete_triggers(trigger_names=[s.name for s in schedules])

Finally, delete the pipeline only if its associated resources, e.g. triggers, have been all cleaned up and all pipeline executions are terminated.

[ ]:
from sagemaker.workflow.pipeline import _PipelineExecution

execution_responses = pipeline.list_executions()["PipelineExecutionSummaries"]
for execution_response in execution_responses:
    execution_arn = execution_response["PipelineExecutionArn"]
    execution = _PipelineExecution(arn=execution_arn)
    try:
        execution.wait()
    except Exception as e:
        print(e)

pipeline.delete()

Notebook CI Test Results

This notebook was tested in multiple regions. The test results are as follows, except for us-west-2 which is shown at the top of the notebook.

This us-east-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This us-east-2 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This us-west-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This ca-central-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This sa-east-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This eu-west-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This eu-west-2 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This eu-west-3 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This eu-central-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This eu-north-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This ap-southeast-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This ap-southeast-2 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This ap-northeast-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This ap-northeast-2 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

This ap-south-1 badge failed to load. Check your device’s internet connectivity, otherwise the service is currently unavailable

[ ]: