Quick Start - Introducing @step Decorator and Pipeline Trigger
This notebook’s CI test result for us-west-2 is as follows. CI test results in other regions can be found at the end of the notebook.
We’re introducing a low-code experience for data scientists to convert the Machine Learning (ML) development code into repeatable and reusable workflow steps of Amazon SageMaker Pipelines.
This sample notebook is a quick introduction to this capability with dummy Python functions wrapped as pipeline steps. The pipeline in this notebook generates random integer numbers and performs a statistic operation (avg or sum) to them.
Note this notebook can only run on either Python 3.8 or Python 3.10. Otherwise, you will get an error message prompting you to provide an image_uri
when defining a step.
Install the dependencies
If you run the notebook from a local IDE outside of SageMaker, please follow the “AWS CLI Prerequisites” section of the Set Up Amazon SageMaker Prerequisites to set up AWS credentials.
Next, run the cell bellow to install all dependencies required by this notebook.
[ ]:
!pip install -r ./requirements.txt
Setup configuration file path
We need to set the directory in which the config.yaml
file resides so that the step decorator can make use of the settings.
[ ]:
import os
# Set path to config file
os.environ["SAGEMAKER_USER_CONFIG_OVERRIDE"] = os.getcwd()
Define pipeline steps
We can define a pipeline step by simply adding a step decorator on top of a custom function (i.e. the generate
function below).
By invoking the generate
function, it would not execute the function at this moment. Rather, it delays the function execution to pipeline execution time when the step is running. Thus, the function would return a DelayedReturn
object, which can represent this step and can be used as input of subsequent steps.
[ ]:
import random
from sagemaker.workflow.function_step import step
# The step name defaults to be the function name (i.e. "generate")
# appended with a UUID to make it unique
@step(keep_alive_period_in_seconds=300)
def generate():
random_number = random.randint(0, 10)
print(f"Generated random number: {random_number}")
return random_number
[ ]:
reduce_func_step_name = "reduce"
# Override the step name.
# Users have to ensure custom step name uniqueness
@step(name=reduce_func_step_name, keep_alive_period_in_seconds=300)
def my_reduce(stat, number_1, number_2):
if stat == "avg":
return sum([number_1, number_2]) / 2.0
elif stat == "sum":
return sum([number_1, number_2])
else:
raise ValueError
Add extra dependencies using conda environment yml file
To run a function in a new conda environment, you can specify the path of an environment.yml
file to the dependencies
attribute as follows:
@step(
dependencies="./environment.yml",
)
def my-multiply(dataframe: pd.DataFrame, factor: float):
result = dataframe * factor
print(f"multiply result: {result}")
return result
Note: A sample environment.yml
file has been provided along with this notebook.
Define a pipeline with steps
After defining all the steps, we can group them into a pipeline.
Notes: 1. There’s no need to put all the step delayed return objects into the pipeline’s steps
list. As we’ve defined the step dependencies via function dependencies, we only need to put the end step (i.e. the DelayedReturn
object returned by my_reduce
) into the list and the pipeline object can automatically retrieve all its upstream steps. 2. The my_reduce
function not only takes in the DelayedReturn
object from the previous step, but consumes a Parameter object as well,
which is used to determine the statistic operation needed to perform.
[ ]:
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.parameters import ParameterString
param = ParameterString(name="stat", default_value="avg")
pipeline_name = "CalculatorPipeline"
pipeline = Pipeline(
name=pipeline_name,
steps=[my_reduce(param, generate(), generate())],
parameters=[param],
)
Create the pipeline and run pipeline execution
[ ]:
import sagemaker
# Note: sagemaker.get_execution_role does not work outside sagemaker
role = sagemaker.get_execution_role()
pipeline.upsert(role_arn=role)
[ ]:
execution = pipeline.start(parallelism_config=dict(MaxParallelExecutionSteps=10))
[ ]:
execution.describe()
[ ]:
execution.wait()
[ ]:
execution.list_steps()
Once the pipeline execution completes, we can retrieve the output of a step as follows. Note: only the output of a @step
decorated function can be retrieved via execution.result()
. In other words, this does not work for the classic step types, e.g. ProcessingStep, TrainingStep etc.
[ ]:
execution.result(step_name=reduce_func_step_name)
Parametrized Executions
In the cell below, we change the “stat” Parameter’s runtime value from the default “avg” to “sum” and re-run the pipeline execution.
[ ]:
execution_sum = pipeline.start(
parameters=dict(
stat="sum",
),
parallelism_config=dict(MaxParallelExecutionSteps=10),
)
[ ]:
execution_sum.wait()
[ ]:
execution_sum.list_steps()
[ ]:
execution_sum.result(step_name=reduce_func_step_name)
Schedule Pipeline Executions
The following cells shows how to set up Pipeline Triggers to interact seamlessly with EventBridge Schedules. A PipelineSchedule automatically starts a pipeline execution one-time or on an interval based on the type of schedule expression used.
To ensure that the pipeline scheduler works properly, please ensure that the following permissions are attached to current execution role.
Attach adequate permissions (e.g. AmazonEventBridgeSchedulerFullAccess) to the role.
Establish trust relationship with EventBridge via adding the service principal
scheduler.amazonaws.com
to the role’s trust policy:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": [
"scheduler.amazonaws.com",
"sagemaker.amazonaws.com",
...
]
},
"Action": "sts:AssumeRole"
}
]
}
The cell below provides a simple sample which demonstrates how to create a rate-based pipeline schedule via configuring the rate
argument. It starts a pipeline execution immediately and reoccurs every 5 minutes.
[ ]:
from sagemaker.workflow.triggers import PipelineSchedule
schedule = PipelineSchedule(name="calculator-rate-schedule", rate=(5, "minutes"))
Assigning triggers to a parent pipeline
A role must be provided to the pipeline scheduler with sufficient permissions to create and use pipeline triggers. This can either be passed in explicitly, as shown below, or be fetched from the intelligent defaults defined in the config.yaml
file.
[ ]:
schedules = [schedule]
pipeline.put_triggers(triggers=schedules, role_arn=role)
[ ]:
pipeline.describe_trigger(trigger_name=schedule.name)
Clean up resources
Before deleting the pipeline, let’s make sure deleting its triggers as well. The triggers can be deleted by specifying a list of trigger names.
Notes on clean up limitations: * This feature to delete the triggers by specifying trigger names is only available in SageMaker Python SDK. If you delete the pipeline via CLI or DeletePipeline API call, triggers are not deleted. This can cause the trigger to become orphaned, and it will attempt to start an execution for a non-existent pipeline. * If you have already deleted the pipeline target without cleaning up its associated triggers (i.e. schedules), be sure to clean up orphaned schedules via the scheduler CLI or EventBridge console.
[ ]:
pipeline.delete_triggers(trigger_names=[s.name for s in schedules])
Finally, delete the pipeline only if its associated resources, e.g. triggers, have been all cleaned up and all pipeline executions are terminated.
[ ]:
from sagemaker.workflow.pipeline import _PipelineExecution
execution_responses = pipeline.list_executions()["PipelineExecutionSummaries"]
for execution_response in execution_responses:
execution_arn = execution_response["PipelineExecutionArn"]
execution = _PipelineExecution(arn=execution_arn)
try:
execution.wait()
except Exception as e:
print(e)
pipeline.delete()
Notebook CI Test Results
This notebook was tested in multiple regions. The test results are as follows, except for us-west-2 which is shown at the top of the notebook.
[ ]: