Portfolio Churn Prediction with Amazon SageMaker AutoPilot and Neo4j

This notebook describes how to use Neo4j and SageMaker together. In it you connect to a Neo4j instance, load data and compute an embedding. You then load that data into Amazon S3. Finally, you use SageMaker to train a model using the new embedding as an additional feature.

The data set represents a binary classification problem based on data from the SEC’s EDGAR database. It was scraped from the EDGAR system using the code here. The data set consists of Form 13 data, the quarterly filings of asset managers with $100M or more of assets under management (AUM).

Important: This example notebook is for demonstrative purposes only. It is not financial advice and should not be relied on as financial or investment advice.

Deploy Neo4j

You’re going to need a Neo4j deployment to run this lab. The easiest way to get that is via the AWS Marketplace. Select “Neo4j Enterprise Edition” and deploy that. Suggested parameters are:

  • Stack name - neo4j-ee

  • Graph Database Version - 5.1.0

  • Install Graph Data Science - True

  • Graph Data Science License Key - None

  • Install Bloom - False

  • Bloom License Key - None

  • Password - Enter something here

  • Number of Servers - 1

  • Instance Type - r6i.8xlarge

  • Disk Size - 100

  • SSH CIDR - 0.0.0.0/0

The Marketplace listing deploys an Auto Scaling Group (ASG) and a Load Balancer (LB) in front of that. When deployment is complete, you can get the DNS name of your LB from the console and use that to connect. You can view deployed NLBs at Load Balancer.

If you need to change any parameters after you’ve deployed, you’ll want to delete the stack and redeploy rather than attempting to update the stack.

Using the Neo4j API

Now that we have a Neo4j deployment, let’s connect to Neo4j. First off, install the Neo4j Graph Data Science package.

[ ]:
%pip install graphdatascience

Now, you’re going to need the connection string and credentials from the deployment you created above.

[ ]:
# Edit these variables!
DB_URL = "neo4j://<XXX-nlb-XXX.elb.XXX>.amazonaws.com:7687"
DB_PASS = "<your-password>"

# You can leave this default
DB_USER = "neo4j"
[ ]:
from graphdatascience import GraphDataScience

gds = GraphDataScience(DB_URL, auth=(DB_USER, DB_PASS))

Load Data into Neo4j

Now that we’ve got our connection object, let’s load the dataset into Neo4j.

The dataset is pulled from the SEC’s EDGAR database. These are public filings of something called Form 13. Asset managers with over 100m AUM are required to submit Form 13 quarterly. That’s then made available to the public over http. The csvs linked above were pulled from EDGAR using some python scripts linked above. We’ve filtered the data to only include filings over 10m in value.

We’re going to create constraints for our data.

[ ]:
result = gds.run_cypher(
    "CREATE CONSTRAINT IF NOT EXISTS FOR (p:Company) REQUIRE (p.cusip) IS NODE KEY;"
)
display(result)

result = gds.run_cypher(
    "CREATE CONSTRAINT IF NOT EXISTS FOR (p:Manager) REQUIRE (p.filingManager) IS NODE KEY;"
)
display(result)

result = gds.run_cypher(
    "CREATE CONSTRAINT IF NOT EXISTS FOR (p:Holding) REQUIRE (p.filingManager, p.cusip, p.reportCalendarOrQuarter) IS NODE KEY;"
)
display(result)

Now let’s load the nodes.

[ ]:
result = gds.run_cypher(
    """
        LOAD CSV WITH HEADERS FROM "https://neo4j-dataset.s3.amazonaws.com/form13/2021.csv" AS row
        MERGE (c:Company {cusip:row.cusip})
        ON CREATE SET
            c.nameOfIssuer=row.nameOfIssuer
    """
)
display(result)
[ ]:
result = gds.run_cypher(
    """
        LOAD CSV WITH HEADERS FROM "https://neo4j-dataset.s3.amazonaws.com/form13/2021.csv" AS row
        MERGE (m:Manager {filingManager:row.filingManager})
    """
)
display(result)
[ ]:
result = gds.run_cypher(
    """
        LOAD CSV WITH HEADERS FROM "https://neo4j-dataset.s3.amazonaws.com/form13/2021.csv" AS row
        MERGE (h:Holding {filingManager:row.filingManager, cusip:row.cusip, reportCalendarOrQuarter:row.reportCalendarOrQuarter})
        ON CREATE SET
            h.value=row.value,
            h.shares=row.shares,
            h.target=row.target,
            h.nameOfIssuer=row.nameOfIssuer
    """
)
display(result)

Now let’s create relationships between those nodes.

[ ]:
result = gds.run_cypher(
    """
        LOAD CSV WITH HEADERS FROM "https://neo4j-dataset.s3.amazonaws.com/form13/2021.csv" AS row
        MATCH (m:Manager {filingManager:row.filingManager})
        MATCH (h:Holding {filingManager:row.filingManager, cusip:row.cusip, reportCalendarOrQuarter:row.reportCalendarOrQuarter})
        MERGE (m)-[r:OWNS]->(h)
    """
)
display(result)
[ ]:
result = gds.run_cypher(
    """
        LOAD CSV WITH HEADERS FROM "https://neo4j-dataset.s3.amazonaws.com/form13/2021.csv" AS row
        MATCH (h:Holding {filingManager:row.filingManager, cusip:row.cusip, reportCalendarOrQuarter:row.reportCalendarOrQuarter})
        MATCH (c:Company {cusip:row.cusip})
        MERGE (h)-[r:PARTOF]->(c)
    """
)
display(result)

Graph Data Science

Now we’re going to use Neo4j Graph Data Science to create an in-memory graph representation of the data. We’ll enhance that representation with features we engineer using a graph embedding.

[ ]:
result = gds.run_cypher(
    """
    CALL gds.graph.project(
      "mygraph",
      ["Company", "Manager", "Holding"],
      {
          OWNS: {orientation: "UNDIRECTED"},
          PARTOF: {orientation: "UNDIRECTED"}
      }
    )
    YIELD
      graphName AS graph,
      relationshipProjection AS readProjection,
      nodeCount AS nodes,
      relationshipCount AS rels
  """
)
display(result)

If you get an error saying the graph already exists, that’s probably because you ran this code before. You can destroy it using this command:

[ ]:
# result = gds.run_cypher(
#  """
#    CALL gds.graph.drop("mygraph")
#  """
# )
# display(result)

Now, let’s list the details of the graph to make sure the projection was created as we want.

[ ]:
result = gds.run_cypher(
    """
    CALL gds.graph.list()
  """
)
display(result)

Now we can generate an embedding from that graph. This is a new feature we can use in our predictions. We’re using FastRP, which is a more full featured and higher performance of Node2Vec. You can learn more about that at the Fast Random Projection documentation page.

There are a bunch of parameters we could adjust in this. One of the most obvious is the embeddingDimension. The documentation covers many more.

[ ]:
result = gds.run_cypher(
    """
  CALL gds.fastRP.mutate("mygraph",{
    embeddingDimension: 16,
    randomSeed: 1,
    mutateProperty:"embedding"
  })
  """
)
display(result)

That creates an embedding for each node type. However, we only want the embedding on the nodes of type holding.

We’re going to take the embedding from our projection and write it to the holding nodes in the underlying database.

[ ]:
result = gds.run_cypher(
    """
    CALL gds.graph.writeNodeProperties("mygraph", ["embedding"], ["Holding"])
    YIELD writeMillis
  """
)
display(result)
[ ]:
result = gds.run_cypher(
    """
    MATCH (n:Holding) RETURN n
  """
)
display(result)

Note that this query will take 2-3 minutes to run as it’s grabbing nearly half a million nodes along with all their properties and our new embedding.

[ ]:
import pandas as pd

df = pd.DataFrame([dict(record.items()) for record in result["n"]])
df

Note that the embedding row is an array. To make this dataset more consumable, we should flatten that out into multiple individual features: embedding_0, embedding_1, … embedding_n.

[ ]:
embeddings = pd.DataFrame(df["embedding"].values.tolist()).add_prefix("embedding_")
merged = df.drop(columns=["embedding"]).merge(embeddings, left_index=True, right_index=True)
merged

Now that we have the data formatted properly, let’s split it into training, testing and validation sets. We’ll write those to disk.

Our data is, in some sense a time series. We’re going to window over three quarters. Q4 of 2021 is used to generate labels, so it’s not present in the data set. That leaves Q3 as our validation data set. Q2 becomes test and Q1 is for training.

We take this approach rather than generating random folds or similar to avoid time based leakage.

[ ]:
df = merged

train = df.loc[df["reportCalendarOrQuarter"] == "03-31-2021"]
train.to_csv("train.csv", index=False)

test = df.loc[df["reportCalendarOrQuarter"] == "06-30-2021"]
test = test.drop(["target"], axis=1)
test.to_csv("test.csv", index=False)

validate = df.loc[df["reportCalendarOrQuarter"] == "09-30-2021"]
validate = validate.drop(["target"], axis=1)
validate.to_csv("validate.csv", index=False)

SageMaker Connection

Let’s setup our SageMaker connection.

[ ]:
import sagemaker
import boto3

region = boto3.Session().region_name

session = sagemaker.Session()
bucket = session.default_bucket()
prefix = "sagemaker/form13"

role = sagemaker.get_execution_role()

sm = boto3.Session().client(service_name="sagemaker", region_name=region)

Upload to Amazon S3

Now we’re going to upload the training and testing data to our default SageMaker bucket.

[ ]:
train_data_s3_path = session.upload_data(path="train.csv", key_prefix=prefix + "/train")
print("Training data uploaded to: " + train_data_s3_path)

test_data_s3_path = session.upload_data(path="test.csv", key_prefix=prefix + "/test")
print("Testing data uploaded to: " + test_data_s3_path)

validation_data_s3_path = session.upload_data(path="validate.csv", key_prefix=prefix + "/validate")
print("Validation data uploaded to: " + validation_data_s3_path)

Setting up the SageMaker AutoPilot Job

After uploading the dataset to Amazon S3, you can invoke AutoPilot to find the best ML pipeline to train a model on this dataset.

[ ]:
auto_ml_job_config = {"CompletionCriteria": {"MaxCandidates": 3}}

input_data_config = [
    {
        "DataSource": {
            "S3DataSource": {
                "S3DataType": "S3Prefix",
                "S3Uri": "s3://{}/{}/train".format(bucket, prefix),
            }
        },
        "TargetAttributeName": "target",
    }
]

output_data_config = {"S3OutputPath": "s3://{}/{}/output".format(bucket, prefix)}

Launching the SageMaker AutoPilot Job

You can now launch the AutoPilot job by calling the create_auto_ml_job method.

[ ]:
from time import gmtime, strftime, sleep

timestamp_suffix = strftime("%d-%H-%M-%S", gmtime())

auto_ml_job_name = "automl-form13-" + timestamp_suffix
print("AutoMLJobName: " + auto_ml_job_name)

sm.create_auto_ml_job(
    AutoMLJobName=auto_ml_job_name,
    InputDataConfig=input_data_config,
    OutputDataConfig=output_data_config,
    AutoMLJobConfig=auto_ml_job_config,
    RoleArn=role,
)

Tracking SageMaker AutoPilot job progress

A SageMaker AutoPilot job consists of the following high-level steps :

  • Analyzing Data, where the dataset is analyzed and AutoPilot comes up with a list of ML pipelines that should be tried out on the dataset. The dataset is also split into train and validation sets.

  • Feature Engineering, where AutoPilot performs feature transformation on individual features of the dataset as well as at an aggregate level.

  • Model Tuning, where the top performing pipeline is selected along with the optimal hyperparameters for the training algorithm (the last stage of the pipeline).

This job typically takes 20-80 minutes to run. That time presumably varies based on the underlying ML algorithm in AutoPilot as well as provisioning times for components of AutoPilot.

[ ]:
print("JobStatus - Secondary Status")
print("----------------------------")

describe_response = sm.describe_auto_ml_job(AutoMLJobName=auto_ml_job_name)
print(describe_response["AutoMLJobStatus"] + " - " + describe_response["AutoMLJobSecondaryStatus"])
job_run_status = describe_response["AutoMLJobStatus"]

while job_run_status not in ("Failed", "Completed", "Stopped"):
    describe_response = sm.describe_auto_ml_job(AutoMLJobName=auto_ml_job_name)
    job_run_status = describe_response["AutoMLJobStatus"]

    print(
        describe_response["AutoMLJobStatus"] + " - " + describe_response["AutoMLJobSecondaryStatus"]
    )
    sleep(30)

Results

Now use the describe_auto_ml_job API to look up the best candidate selected by the SageMaker AutoPilot job.

[ ]:
import pprint

best_candidate = sm.describe_auto_ml_job(AutoMLJobName=auto_ml_job_name)["BestCandidate"]
best_candidate_name = best_candidate["CandidateName"]

print("CandidateName: " + best_candidate_name)
print(
    "FinalAutoMLJobObjectiveMetricName: "
    + best_candidate["FinalAutoMLJobObjectiveMetric"]["MetricName"]
)
print(
    "FinalAutoMLJobObjectiveMetricValue: "
    + str(best_candidate["FinalAutoMLJobObjectiveMetric"]["Value"])
)
print()
pprint.pprint(best_candidate)

Batch Inference

Now that we completed the SageMaker AutoPilot job on the dataset, let’s create a model from the best candidate with Inference Pipelines.

[ ]:
model_name = "automl-form13-model-" + timestamp_suffix
model = sm.create_model(
    Containers=best_candidate["InferenceContainers"], ModelName=model_name, ExecutionRoleArn=role
)
print("Model ARN corresponding to the best candidate is: {}".format(model["ModelArn"]))

We can use batch inference through Amazon SageMaker batch transform. The same model can also be deployed to perform online inference using Amazon SageMaker hosting.

[ ]:
transform_job_name = "automl-form13-transform-" + timestamp_suffix

transform_input = {
    "DataSource": {"S3DataSource": {"S3DataType": "S3Prefix", "S3Uri": test_data_s3_path}},
    "ContentType": "text/csv",
    "CompressionType": "None",
    "SplitType": "Line",
}

transform_output = {
    "S3OutputPath": "s3://{}/{}/inference-results".format(bucket, prefix),
}

transform_resources = {"InstanceType": "ml.m5.4xlarge", "InstanceCount": 1}

sm.create_transform_job(
    TransformJobName=transform_job_name,
    ModelName=model_name,
    TransformInput=transform_input,
    TransformOutput=transform_output,
    TransformResources=transform_resources,
)

Now we can watch the transform job for completion. That takes approximately 20 minutes.

[ ]:
print("JobStatus")
print("---------")

describe_response = sm.describe_transform_job(TransformJobName=transform_job_name)
job_run_status = describe_response["TransformJobStatus"]
print(job_run_status)

while job_run_status not in ("Failed", "Completed", "Stopped"):
    describe_response = sm.describe_transform_job(TransformJobName=transform_job_name)
    job_run_status = describe_response["TransformJobStatus"]
    print(job_run_status)
    sleep(30)

Now let’s get the URL of the transform job results. You can open this in S3.

[ ]:
bucket = session.default_bucket()
key = "{}/inference-results/test_data.csv.out".format(prefix)
url = "s3://" + bucket + key

print(url)

View All Candidates

You can view all the candidates (pipeline evaluations with different hyperparameter combinations) that were explored by SageMaker AutoPilot and sort them by their final performance metric.

[ ]:
candidates = sm.list_candidates_for_auto_ml_job(
    AutoMLJobName=auto_ml_job_name, SortBy="FinalObjectiveMetricValue"
)["Candidates"]
index = 0
for candidate in candidates:
    print(
        str(index)
        + "  "
        + candidate["CandidateName"]
        + "  "
        + str(candidate["FinalAutoMLJobObjectiveMetric"]["Value"])
    )
    index += 1

Candidate Generation Notebook

SageMaker AutoPilot also auto-generates a Candidate Definitions notebook. This notebook can be used to interactively step through the various steps taken by the SageMaker AutoPilot to arrive at the best candidate. This notebook can also be used to override various runtime parameters like parallelism, hardware used, algorithms explored, feature extraction scripts and more.

This code downloads a file from our SageMaker bucket using the SageMaker session.

[ ]:
def downloadNotebook(s3_path):
    session = sagemaker.Session()
    role = sagemaker.get_execution_role()

    # reformat the s3 URL into something boto3 can handle
    s3_path_parts = s3_path.replace("s3://", "").split("/")
    bucket, key, file = s3_path_parts[0], "/".join(s3_path_parts[1:]), s3_path_parts[-1]

    print(bucket)
    print(key)
    print(file)

    print("file" + file)
    notebook = session.read_s3_file(bucket, key)
    with open(file, "w") as text_file:
        text_file.write(notebook)

We can download the notebook with the command:

[ ]:
notebook_s3_path = sm.describe_auto_ml_job(AutoMLJobName=auto_ml_job_name)["AutoMLJobArtifacts"][
    "CandidateDefinitionNotebookLocation"
]
downloadNotebook(notebook_s3_path)

Data Exploration Notebook

SageMaker Autopilot also auto-generates a Data Exploration notebook. This code will download that notebook:

[ ]:
notebook_s3_path = sm.describe_auto_ml_job(AutoMLJobName=auto_ml_job_name)["AutoMLJobArtifacts"][
    "DataExplorationNotebookLocation"
]
downloadNotebook(notebook_s3_path)

Cleanup

SageMaker stores its data in an Amazon S3 bucket. You may want to the results of our job in that bucket once you’re done working with it.

The AWS Marketplace listing we deployed Neo4j Enterprise Edition with created a stack. To delete the deployment, you would navigate to Amazon CloudFormation in the console and delete the stack there. Be sure to delete the entire stack as that will delete all the subcomponents of the stack.

Conclusion

In this notebook, you deployed Neo4j Enterprise Edition. Within SageMaker Studio, you then loaded a data set in Neo4j Graph Database. You used Neo4j Graph Data Science to compute a graph embedding on that dataset. Using that embedding, you ran a SageMaker AutoPilot job and inspected the output.

This same flow can be repurposed to add graph embeddings to your own machine learning jobs. Graph embeddings are just one sort of graph feature that can be used in machine learning. The approach we used here would apply to incorporating other features like betweeness or neighborhood as well.