Recommendation Engine for E-Commerce Sales - Pipeline Mode

This notebook gives an overview of techniques and services offer by SageMaker to build and deploy a personalized recommendation engine.

Dataset

The dataset for this demo comes from the UCI Machine Learning Repository. It contains all the transactions occurring between 01/12/2010 and 09/12/2011 for a UK-based and registered non-store online retail. The company mainly sells unique all-occasion gifts. The following attributes are included in our dataset: + InvoiceNo: Invoice number. Nominal, a 6-digit integral number uniquely assigned to each transaction. If this code starts with letter ‘c’, it indicates a cancellation. + StockCode: Product (item) code. Nominal, a 5-digit integral number uniquely assigned to each distinct product. + Description: Product (item) name. Nominal. + Quantity: The quantities of each product (item) per transaction. Numeric. + InvoiceDate: Invice Date and time. Numeric, the day and time when each transaction was generated. + UnitPrice: Unit price. Numeric, Product price per unit in sterling. + CustomerID: Customer number. Nominal, a 5-digit integral number uniquely assigned to each customer. + Country: Country name. Nominal, the name of the country where each customer resides.

Citation: Daqing Chen, Sai Liang Sain, and Kun Guo, Data mining for the online retail industry: A case study of RFM model-based customer segmentation using data mining, Journal of Database Marketing and Customer Strategy Management, Vol. 19, No. 3, pp. 197–208, 2012 (Published online before print: 27 August 2012. doi: 10.1057/dbm.2012.17)

## Solution Architecture

Architecture

[ ]:
!pip install -U sagemaker==2.92.1 --quiet
[ ]:
import sagemaker
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import CreateModelStep, ProcessingStep, TrainingStep
from sagemaker.workflow.step_collections import RegisterModel
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.workflow.parameters import ParameterInteger, ParameterFloat, ParameterString
import datetime
import boto3
import time
import pandas as pd
from preprocessing import loadDataset
[ ]:
assert sagemaker.__version__ >= "2.21.0"
[ ]:
region = boto3.Session().region_name
boto3.setup_default_session(region_name=region)
boto_session = boto3.Session(region_name=region)

s3_client = boto3.client("s3", region_name=region)

sagemaker_boto_client = boto_session.client("sagemaker")
sagemaker_session = sagemaker.session.Session(
    boto_session=boto_session, sagemaker_client=sagemaker_boto_client
)
sagemaker_role = sagemaker.get_execution_role()

bucket = sagemaker_session.default_bucket()

prefix = "personalization"

output_prefix = f"s3://{bucket}/{prefix}/output"

Define Estimator

First, the number of feature dimensions must be calculated as it is a hyperparameter of the estimator. The feature dimensions are calculated by looking at the dataset, cleaning and preprocessing it as defined in the first part of Recommendation Engine for E-Commerce Sales, and then counting the number of feature dimensions are in the processed dataset.

[ ]:
df = pd.read_csv("data/Online Retail.csv")
df.dropna(subset=["CustomerID"], inplace=True)
df["Description"] = df["Description"].apply(lambda x: x.strip())
df = df.groupby(["StockCode", "Description", "CustomerID", "Country", "UnitPrice"])[
    "Quantity"
].sum()
df = df.loc[df > 0].reset_index()
X, y = loadDataset(df)
input_dims = X.shape[1]
input_dims

After calculating all the hyperparameters that are needed, the estimator is created.

[ ]:
container = sagemaker.image_uris.retrieve("factorization-machines", region=boto_session.region_name)

fm = sagemaker.estimator.Estimator(
    container,
    sagemaker_role,
    instance_count=1,
    instance_type="ml.c5.xlarge",
    output_path=output_prefix,
    sagemaker_session=sagemaker_session,
)

fm.set_hyperparameters(
    feature_dim=input_dims,
    predictor_type="regressor",
    mini_batch_size=1000,
    num_factors=64,
    epochs=20,
)

Build Pipeline

Now that we are comfotable with the model that we built.

[ ]:
base_uri = f"s3://{bucket}/data"
input_data_uri = sagemaker.s3.S3Uploader.upload(
    local_path="data/Online Retail.csv", desired_s3_uri=base_uri
)

input_data = ParameterString(name="InputData", default_value=input_data_uri)
[ ]:
model_approval_status = ParameterString(
    name="ModelApprovalStatus", default_value="PendingManualApproval"
)
[ ]:
create_dataset_script_uri = f"s3://{bucket}/{prefix}/code/preprocessing.py"
s3_client.upload_file(
    Filename="preprocessing.py", Bucket=bucket, Key=f"{prefix}/code/preprocessing.py"
)

sklearn_processor = SKLearnProcessor(
    framework_version="1.0-1",
    instance_type="ml.m5.xlarge",
    instance_count=1,
    base_job_name="sklearn-retail-sales-process",
    role=sagemaker_role,
)

create_dataset_step = ProcessingStep(
    name="PreprocessData",
    processor=sklearn_processor,
    inputs=[
        sagemaker.processing.ProcessingInput(
            source=input_data, destination="/opt/ml/processing/input"
        ),
    ],
    outputs=[
        sagemaker.processing.ProcessingOutput(
            output_name="train_data", source="/opt/ml/processing/output/train"
        ),
        sagemaker.processing.ProcessingOutput(
            output_name="test_data", source="/opt/ml/processing/output/test"
        ),
    ],
    code=create_dataset_script_uri,
)
[ ]:
train_step = TrainingStep(
    name="TrainingStep",
    estimator=fm,
    inputs={
        "train": sagemaker.inputs.TrainingInput(
            s3_data=create_dataset_step.properties.ProcessingOutputConfig.Outputs[
                "train_data"
            ].S3Output.S3Uri
        ),
        "test": sagemaker.inputs.TrainingInput(
            s3_data=create_dataset_step.properties.ProcessingOutputConfig.Outputs[
                "test_data"
            ].S3Output.S3Uri
        ),
    },
)
[ ]:
model = sagemaker.model.Model(
    name="retail-personalization-factorization-machine",
    image_uri=container,
    model_data=train_step.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=sagemaker_session,
    role=sagemaker_role,
)

inputs = sagemaker.inputs.CreateModelInput(instance_type="ml.m4.xlarge")

create_model_step = CreateModelStep(name="CreateModel", model=model, inputs=inputs)
[ ]:
timestamp = datetime.datetime.now().strftime("%Y-%m-%d-%H-%M")
mpg_name = f"retail-recommendation-{timestamp}"

register_step = RegisterModel(
    name="RegisterModel",
    estimator=fm,
    model_data=train_step.properties.ModelArtifacts.S3ModelArtifacts,
    content_types=["application/x-recordio-protobuf", "application/json"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=mpg_name,
    approval_status=model_approval_status,
)
[ ]:
s3_client.upload_file(Filename="deploy.py", Bucket=bucket, Key=f"{prefix}/code/deploy.py")
deploy_script_uri = f"s3://{bucket}/{prefix}/code/deploy.py"

deployment_processor = SKLearnProcessor(
    framework_version="1.0-1",
    role=sagemaker_role,
    instance_type="ml.t3.medium",
    instance_count=1,
    base_job_name=f"{prefix}-deploy",
    sagemaker_session=sagemaker_session,
)

deploy_step = ProcessingStep(
    name="DeployModel",
    processor=deployment_processor,
    job_arguments=[
        "--model-name",
        create_model_step.properties.ModelName,
        "--region",
        region,
        "--endpoint-instance-type",
        "ml.m4.xlarge",
        "--endpoint-name",
        "retail-recommendation-endpoint",
    ],
    code=deploy_script_uri,
)
[ ]:
pipeline_name = f"PersonalizationDemo"

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[input_data, model_approval_status],
    steps=[create_dataset_step, train_step, create_model_step, register_step, deploy_step],
)
[ ]:
pipeline.upsert(role_arn=sagemaker_role)
[ ]:
start_response = pipeline.start()
start_response.wait()
start_response.describe()

image.png

[ ]:
from sagemaker.lineage.visualizer import LineageTableVisualizer
from pprint import pprint


viz = LineageTableVisualizer(sagemaker_session)
for execution_step in reversed(start_response.list_steps()):
    pprint(execution_step)
    display(viz.show(pipeline_execution_step=execution_step))
    time.sleep(5)