Skip to main content

Feature Store with Aerospike

For an interactive Jupyter notebook experience: Binder

This notebook is the first in the series of notebooks that show how Aerospike can be used as a feature store.

This notebook requires the Aerospike Database and Spark running locally with Aerospike Spark Connector. To create a Docker container that satisfies the requirements and holds a copy of Aerospike notebooks, visit the Aerospike Notebooks Repo.

Introduction

This notebook demonstrates how Aerospike can be used as a Feature Store for Machine Learning applications on Spark using the Aerospike Spark Connector. It is part of the Feature Store series of notebooks, and focuses on data model and Feature Engineering aspects concerning a Feature Store. The subsequent notebook(s) will describe the Model Traiing and Model Serving aspects of an ML application that deal with a Feature Store.

Reference Architecture


This notebook is organized in two parts:

  1. The first part explains the key objects, data model, and operations of a Feature Store, and provides a simple implementation on top of the Aerospike Database.
  2. The second part shows with two examples how the model developed in the first part can be used for Feature Engineering and storing features in the Feature Store.

Prerequisites

This tutorial assumes familiarity with the following topics:

Setup

Set up Aerospike Server. Spark Server, and Spark Connector.

Ensure Database Is Running

This notebook requires that Aerospike database is running.

!asd >& /dev/null
!pgrep -x asd >/dev/null && echo "Aerospike database is running!" || echo "**Aerospike database is not running!**"

Output:

Aerospike database is running!

Initialize Spark

We will be using Spark functionality in this notebook.

Initialize Paths and Env Variables

# directory where spark notebook requisites are installed
#SPARK_NB_DIR = '/home/jovyan/notebooks/spark'
SPARK_NB_DIR = '/opt/spark-nb'
SPARK_HOME = SPARK_NB_DIR + '/spark-3.0.3-bin-hadoop3.2'
# IP Address or DNS name for one host in your Aerospike cluster
AS_HOST ="localhost"
# Name of one of your namespaces. Type 'show namespaces' at the aql prompt if you are not sure
AS_NAMESPACE = "test"
AEROSPIKE_SPARK_JAR_VERSION="3.2.0"
AS_PORT = 3000 # Usually 3000, but change here if not
AS_CONNECTION_STRING = AS_HOST + ":"+ str(AS_PORT)
# Next we locate the Spark installation - this will be found using the SPARK_HOME environment variable that you will have set 
import findspark
findspark.init(SPARK_HOME)
# Aerospike Spark Connector related settings
import os
AEROSPIKE_JAR_PATH= "aerospike-spark-assembly-"+AEROSPIKE_SPARK_JAR_VERSION+".jar"
os.environ["PYSPARK_SUBMIT_ARGS"] = '--jars ' + SPARK_NB_DIR + '/' + AEROSPIKE_JAR_PATH + ' pyspark-shell'

Configure Spark Session

Please visit Configuring Aerospike Connect for Spark for more information about the properties used on this page.

# imports
import pyspark
from pyspark.context import SparkContext
from pyspark.sql.context import SQLContext
from pyspark.sql.session import SparkSession
from pyspark.sql.types import StringType, StructField, StructType, ArrayType, IntegerType, MapType, LongType, DoubleType
sc = SparkContext.getOrCreate()
conf=sc._conf.setAll([("aerospike.namespace",AS_NAMESPACE),("aerospike.seedhost",AS_CONNECTION_STRING)])
sc.stop()
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession(sc)
sqlContext = SQLContext(sc)

Access Shell Commands

You may execute shell commands including Aerospike tools like aql and asadm in the terminal tab throughout this tutorial. Open a terminal tab by selecting File->Open from the notebook menu, and then New->Terminal.

Online and Offline Feature Stores

Aerospike can be used as a Feature Store in cases where broader capabilities of a commercial Feature Store are not warranted. Instead, when it is critical to have:

  • Sub-millisecond online access to features
  • Scaling to a very large number of entities and features
  • Convenient and reliable distributed deployment
  • Small server footprint for a low cost of operation

An Online Feature Store provides access to features in real time, and as such requires speed and, for many deployments, scalability. The Online Store only stores the latest version of feature values.

Aerospike is purpose built for the performance and scale requirements of an Online Store.

An Offline Store requires historic data to be maintained for “time travel” use cases, that is, to create datasets at a particular point in time for training and other needs. A time series of feature values can be maintained in Aerospike as a map (timestamp->value) for each feature. Aerospike's native APIs allow efficient implementation of an Offline Store.

Offline-Online Consistency

An essential requirement for a Feature Store is to maintain Online-Offline consistency to ensure that the models that were trained with the Offline (historic) data remain valid in the production (Online) environment. Typically this means convenient, efficient, and timely sync of data from Offline to Online Store. Sometimes a reverse sync from Online to Offline Store may be requred if only the Online Store may be updated.

Synchronization in either direction between Online and Offline Stores can be achieved through various streaming connectors that Aerospike provides (such as for Kafka and Pulsar).

What is also necessary is to be able to access a specific version “as of time” (a special case being the latest version) for various use cases using the appropriate client library or connector.

Our focus in this Feature Store series will be primarily on the Online Store.

Design Considerations

The key design criteria for a Feature Store with Aerospike are access and storage efficiency. (The other key criterion, Offline - Online synchronization, will not be discussed further here. For our purpose here, we assume that can be handled with appropriate change notification and update setup between Offline and Online Stores.)

We will assume the following object model which is typical in a Feature Store:

  • Precomputed Feature Values are stored for Entities such as users, credit card transactions, and sensors.
  • Features are organized in Feature Groups. A Feature Group signifies the set of features that share common data sources, pipeline, and are created and updated together.
  • A Dataset is a snapshot of specified Features across a set of Entity instances. The snapshot is stored outside the Feature Store, whereas the Feature Store holds the metadata such as the snapshot query and location.
  • The Online Store keeps only the latest version of Feature Values, whereas the Offline store maintains a time series of Feature Values.

Online Store

There are multiple ways to store the latest Feature Values, each with its trade-offs. Here are two possible options:

  1. One Entity instance record holds all Feature Values across multiple Feature Groups.

    • Stored in a set named after the Entity type (e.g., "user-features").
    • Read and write for a Feature Group's Features entails listing the Feature bins. Accessing a Feature vector at Model Serving time entails enumerating the model's Feature bins across Feature Groups. To avoid Feature name collisions acroos Feature Groups, a bin name needs to include both the Feature Group and Feature name such as Feature-Group:Feature.
    • Pros:
      • Fast and convenient read and write access to all features of an entity.
    • Cons:
      • A single last-update-time must be shared among all Feature Groups without additional mechanisms.
  2. Multiple Entity instance records, one for each Feature Group.

    • Each Feature Group's values are stored in its own set named in form Entity:FeatureGroup-features such as "user:fg1-features".
    • Read and write for a Feature Group's Features entail listing the Feature bins. Accessing a Feature vector at Model Serving time entails multiple requests over Feature Groups. Bin names are same as Feature names since they are unique within a Feature Group.
    • Pros:
      • Provides a better organization.
      • Allows for a large number of Features if they may exceed the record size limit.
      • Allows a separate timestamp for each Feature Group since each is updated separately.
    • Cons:
      • Multiple requests must be issued to access Features across Feature Groups.

Offline Store

The design requires multiple versions of Feature Values to be stored, each with its timestamp. One way of modeling such a time series of feature values in Aerospike is to store each feature as a map timestamp->value.

Modeling for Size

With historic versions stored in it, the Offline Store has to account for the data size problem: both in terms of individual record size as well as overall database size.

  • Record size
    • Limit the number of versions in each record by designating a record by its time duration. So Feature Values for an Entity instance will have multiple records by hour, day, week, month, or year, depending on the frequency of updates. The record will have a compound key of the form entity-key:duration-id like "123:202101", for example, for a record holding feature values during January 2021 for a user with id 123.
    • As discussed above, the record size can be kept small by storing each Feature Group's Feature Values in its own set.
  • Database size
    • To limit the size of the Offline Store, older data can be archived in a batch process. An indexed bin in a record holds the record's duration specifier allowing efficient access to old records for archival. For example, weekly records wiil have an indexed bin with a year and week values concatenated like "202140".

Improving Performance and Scalability

Before we dive into the implementation, here are some relevant performance and scalability aspects in use of the Spark Connector.

Random access with primary key

Random access using the primary key must use the __key bin for direct access through the primary index. Many times the unique user key is duplicated for convenience in another bin, but the equality predicate using another bin will result in a scan. This is shown below in single object load methods.

Efficient scan with set indexes

A set index should be enabled on a set for faster scans over the set. Data exploration as well as dataset creation operations will benefit by not having to scan the entire namespace instead of a fraction of records in a set, resulting in significant speedup as the namespace can be very large as compared to, say, the metadata sets. In the following implementation, we enable set indexes on all sets.

Aerospike Expression pushdown

To minimize the amount of data retrieved from the database, query predicates must be "pushed down", or in other words, processed in the database and not on Spark. We will illustrate how expression pushdown is implemented with examples in the Model Training notebook.

Performance tuning parameters

Several tuning parameters such as partition factor and compression are available for optimizing perfomrance with the Spark Connector. The implementation here does not make use of them, but you are encouraged to explore them elsewhere.

Use of secondary indexes

While Aerospike provides secondary indexes, the Spark Connector currently does not leverage them. This performance enhancement is planned in the future.

Defining a Simple Feature Store

To make it concrete, let us define and implement a simple Feature Store.

Main Objects

The main objects in a Feature Store as discussed above are Feature Group, Feature, Entity, and Dataset. These are explained below.

Features Group

Machine Learning features are grouped by the source of data and processing pipeline. A Feature Group includes many features, and has the following attributes that are useful during feature exploration:

  • name: a unique name
  • description: human readable description
  • source: source of fact data
  • attrs: other metadata
  • tags: associated tags

A feature group is stored in the set "fg-metadata".

Feature

A Feature consists of the following attributes:

  • fid: the record's primary key consists of the string concatenation of feature group name and feature name in the form: fgname_fname
  • fgname: the name of the feature group that the feature belongs to
  • name: feature name that is unique within the feature group
  • type: feature type (integer, double, string, and possibly others)
  • description: human readable description
  • attrs: various performance stats and other metadata
  • tags: associated tags

A feature is stored in the set "feature-metadata".

Entity

Features are computed and stored for an Entity such as a user, credit card, or sensor. Feature values are stored per Entity instance. Features in multiple feature groups for an Entity type are combined in one Entity record. A feature values record for an Entity has these attributes:

  • id_col: the column containing the id of the entity instance that serves as the record key
  • feature specific bins: each feature is stored in a bin named after the feature in the format fgname_fname.
  • timestamp: update timestamp

Entity records for an entity type are stored in an entity-type specific set "entitytype-features". For example, the set "user-features" holds features for user instances, and "cc-features" holds features for credit card instances.

Dataset

A Dataset is a subset of features and entities selected to train an ML model. A Dataset object holds the selected features and entity instance definitions. The actual copy of entities is stored outside the feature store (for instance, in a file system). A dataset record has the following attributes.

  • name: name of the data set, serves as the primary key for the record
  • description: human readable description
  • features: a list of the dataset features
  • predicate: query predicate to enumerate the entity instances in the dataset
  • location: external location where the dataset is stored
  • attrs: other metadata
  • tags: associated tags

Datasets are stored in the set "dataset-metadata".

Operations

The following operations are implemented for the different use scenarios:

  • Feature Engineering

    • Feature Group
      • create or update (save)
      • load (get)
    • Feature
      • create or update
      • load
    • Entity
      • create or update using a dataframe
      • load
  • Model Training

    • Feature Group
      • load
      • query by various attributes
    • Feature
      • query
    • Entity
      • query
    • Dataset
      • create
      • load
      • query
  • Model Serving

    • Entity
      • get a specific feature vector for an entity instance

Example Implementation

The following code is a simple implementation of the above operations. These operations will be illustrated in this and follow-up notebooks.

Feature Group Implementation

import copy

class FeatureGroup:
schema = StructType([StructField("name", StringType(), False),
StructField("description", StringType(), True),
StructField("source", StringType(), True),
StructField("attrs", MapType(StringType(), StringType()), True),
StructField("tags", ArrayType(StringType()), True)])

def __init__(self, name, description, source, attrs, tags):
self.name = name
self.description = description
self.source = source
self.attrs = attrs
self.tags = tags
return

def __str__(self):
return str(self.__class__) + ": " + str(self.__dict__)


def save(self):
inputBuf = [(self.name, self.description, self.source, self.attrs, self.tags)]
inputRDD = spark.sparkContext.parallelize(inputBuf)
inputDF = spark.createDataFrame(inputRDD, FeatureGroup.schema)
#Write the data frame to Aerospike, the name field is used as the key
inputDF.write \
.mode('overwrite') \
.format("aerospike") \
.option("aerospike.writeset", "fg-metadata")\
.option("aerospike.updateByKey", "name") \
.save()
return

def load(name):
fg = None
schema = copy.deepcopy(FeatureGroup.schema)
schema.add("__key", StringType(), False)
fgdf = spark.read \
.format("aerospike") \
.option("aerospike.set", "fg-metadata") \
.schema(schema) \
.load().where("__key = \"" + name + "\"")
if fgdf.count() > 0:
fgtuple = fgdf.collect()[0]
fg = FeatureGroup(*fgtuple[:-1])
return fg

def query(predicate): #returns a dataframe
fg_df = spark.read \
.format("aerospike") \
.schema(FeatureGroup.schema) \
.option("aerospike.set", "fg-metadata") \
.load().where(predicate)
return fg_df
# Enable set index on fg-metadata
!asinfo -v "set-config:context=namespace;id=test;set=fg-metadata;enable-index=true"

Output:

ok
# test feature group 
# test save and load
# save
fg1 = FeatureGroup("fg_name1", "fg_desc1", "fg_source1",
{"fg_attr1":"1", "fg_attr2":"two"}, ["fg_tag1", "fg_tag2"])
fg1.save()
# load
fg2 = FeatureGroup.load("fg_name1")
print(fg2, '\n')
# test query
print("Feature Groups with name ending with '_name1' and having attribute 'fg_tag1'='1':")
df = FeatureGroup.query("name like '%_name1' and attrs.fg_attr1 == '1'")
df.show()

Output:

<class '__main__.FeatureGroup'>: {'name': 'fg_name1', 'description': 'fg_desc1', 'source': 'fg_source1', 'attrs': {'fg_attr2': 'two', 'fg_attr1': '1'}, 'tags': ['fg_tag1', 'fg_tag2']} 

Feature Groups with name ending with '_name1' and having attribute 'fg_tag1'='1':
+--------+-----------+----------+--------------------+------------------+
| name|description| source| attrs| tags|
+--------+-----------+----------+--------------------+------------------+
|fg_name1| fg_desc1|fg_source1|[fg_attr1 -> 1, f...|[fg_tag1, fg_tag2]|
+--------+-----------+----------+--------------------+------------------+

Feature Implementation

class Feature:
schema = StructType([StructField("fid", StringType(), False),
StructField("fgname", StringType(), False),
StructField("name", StringType(), False),
StructField("type", StringType(), False),
StructField("description", StringType(), True),
StructField("attrs", MapType(StringType(), StringType()), True),
StructField("tags", ArrayType(StringType()), True)])

def __init__(self, fgname, name, ftype, description, attrs, tags):
self.fid = fgname + '_' + name
self.fgname = fgname
self.name = name
self.ftype = ftype
self.description = description
self.attrs = attrs
self.tags = tags
return

def __str__(self):
return str(self.__class__) + ": " + str(self.__dict__)

def save(self):
inputBuf = [(self.fid, self.fgname, self.name, self.ftype, self.description, self.attrs, self.tags)]
inputRDD = spark.sparkContext.parallelize(inputBuf)
inputDF = spark.createDataFrame(inputRDD, Feature.schema)
#Write the data frame to Aerospike, the fid field is used as the key
inputDF.write \
.mode('overwrite') \
.format("aerospike") \
.option("aerospike.writeset", "feature-metadata")\
.option("aerospike.updateByKey", "fid") \
.save()
return

def load(fgname, name):
f = None
schema = copy.deepcopy(Feature.schema)
schema.add("__key", StringType(), False)
f_df = spark.read \
.format("aerospike") \
.schema(schema) \
.option("aerospike.set", "feature-metadata") \
.load().where("__key = \"" + fgname+'_'+name + "\"")
if f_df.count() > 0:
f_tuple = f_df.collect()[0]
f = Feature(*f_tuple[1:-1])
return f

def query(predicate): #returns a dataframe
f_df = spark.read \
.format("aerospike") \
.schema(Feature.schema) \
.option("aerospike.set", "feature-metadata") \
.load().where(predicate)
return f_df
# Enable set index on feature-metadata
!asinfo -v "set-config:context=namespace;id=test;set=feature-metadata;enable-index=true"

Output:

ok
# test Feature
# test save and load
# save
feature1 = Feature("fg_name1", "f_name1", "integer", "f_desc1",
{"f_attr1":"1", "f_attr2":"two"}, ["f_tag1", "f_tag2"])
feature1.save()
# load
f1 = Feature.load("fg_name1", "f_name1")
print(f1, '\n')
# test query
feature2 = Feature("fg_name1", "f_name2", "double", "f_desc2",
{"f_attr1":"1.0", "f_attr3":"three"}, ["f_tag1", "f_tag3"])
feature2.save()
print("Features having name starting with 'f_name' and tagged with 'f_tag1':")
f_df = Feature.query("name like 'f_name%' and array_contains(tags, 'f_tag1')")
f_df.show()

Output:

<class '__main__.Feature'>: {'fid': 'fg_name1_f_name1', 'fgname': 'fg_name1', 'name': 'f_name1', 'ftype': 'integer', 'description': 'f_desc1', 'attrs': {'f_attr2': 'two', 'f_attr1': '1'}, 'tags': ['f_tag1', 'f_tag2']} 

Features having name starting with 'f_name' and tagged with 'f_tag1':
+----------------+--------+-------+-------+-----------+--------------------+----------------+
| fid| fgname| name| type|description| attrs| tags|
+----------------+--------+-------+-------+-----------+--------------------+----------------+
| fgname1_f_name1| fgname1|f_name1|integer| f_desc1|[etype -> etype1,...|[f_tag1, f_tag2]|
|fg_name1_f_name2|fg_name1|f_name2| double| f_desc2|[f_attr1 -> 1.0, ...|[f_tag1, f_tag3]|
| fgname1_f_name2| fgname1|f_name2| double| f_desc2|[etype -> etype1,...|[f_tag1, f_tag3]|
|fg_name1_f_name1|fg_name1|f_name1|integer| f_desc1|[f_attr1 -> 1, f_...|[f_tag1, f_tag2]|
+----------------+--------+-------+-------+-----------+--------------------+----------------+

Entity Implementation

class Entity:

def __init__(self, etype, record, id_col):
# record is an array of triples (name, type, value)
self.etype = etype
self.record = record
self.id_col = id_col
return

def __str__(self):
return str(self.__class__) + ": " + str(self.__dict__)

def get_schema(record):
schema = StructType()
for f in record:
schema.add(f[0], f[1], True)
return schema

def get_id_type(schema, id_col):
return schema[id_col].dataType.typeName()

def save(self, schema):
fvalues = [f[2] for f in self.record]
inputBuf = [tuple(fvalues)]
inputRDD = spark.sparkContext.parallelize(inputBuf)
inputDF = spark.createDataFrame(inputRDD, schema)
#Write the data frame to Aerospike, the id_col field is used as the key
inputDF.write \
.mode('overwrite') \
.format("aerospike") \
.option("aerospike.writeset", self.etype+'-features')\
.option("aerospike.updateByKey", self.id_col) \
.save()
return

def load(etype, eid, schema, id_col):
ent = None
ent_df = spark.read \
.format("aerospike") \
.schema(schema) \
.option("aerospike.set", etype+'-features') \
.option("aerospike.keyType", "string") \
.load().where(id_col + " = \"" + eid + "\"")
if ent_df.count() > 0:
ent_tuple = ent_df.collect()[0]
record = [(schema[i].name, schema[i].dataType.typeName(), fv) for i, fv in enumerate(ent_tuple)]
ent = Entity(etype, record, id_col)
return ent

def saveDF(df, etype, id_col): # save a dataframe
# df: dataframe consisting of feature values
# etyoe: entity type (such as user or sensor)
# id_col: column name that holds the primary key
#Write the data frame to Aerospike, the column in id_col is used as the key
df.write \
.mode('overwrite') \
.format("aerospike") \
.option("aerospike.writeset", etype+'-features')\
.option("aerospike.updateByKey", id_col) \
.save()
return

def query(etype, predicate, schema, id_col): #returns a dataframe
ent_df = spark.read \
.format("aerospike") \
.schema(schema) \
.option("aerospike.set", etype+'-features') \
.option("aerospike.keyType", Entity.get_id_type(schema, id_col)) \
.load().where(predicate)
return ent_df

def get_feature_vector(etype, eid, feature_list): # elements in feature_list are in "fg_name|name" form
# deferred to Model Serving notebook
pass
# Enable set index on cctxn-features
!asinfo -v "set-config:context=namespace;id=test;set=cctxn-features;enable-index=true"

Output:

ok
# test Entity
# test save and load
# save
features1 = [('fg1:f_name1', IntegerType(), 1), ('fg1:f_name2', DoubleType(), 2.0), ('fg1:f_name3', StringType(), 'three')]
record1 = [('eid', StringType(), 'eid1')] + features1
ent1 = Entity('entity_type1', record1, 'eid')
schema = Entity.get_schema(record1)
ent1.save(schema);
# load
e1 = Entity.load('entity_type1', 'eid1', schema, 'eid')
print(e1, '\n')
# test query
features2 = [('fg1:f_name1', IntegerType(), 10), ('fg1:f_name2', DoubleType(), 20.0), ('fg1:f_name3', StringType(), 'thirty')]
record2 = [('eid', StringType(), 'eid2')] + features2
ent2 = Entity('entity_type2', record2, 'eid')
ent2.save(schema);
# query 1
print("Instances of entity type entity_type1 with eid ending in 1:")
instances = Entity.query('entity_type1', 'eid like "%1"', schema, 'eid')
instances.show()
# query 2
print("Instances of entity type entity_type2 with eid in ['eid2']:")
instances = Entity.query('entity_type2', 'eid in ("eid2")', schema, 'eid')
instances.show()

Output:

<class '__main__.Entity'>: {'etype': 'entity_type1', 'record': [('eid', 'string', 'eid1'), ('fg1:f_name1', 'integer', 1), ('fg1:f_name2', 'double', 2.0), ('fg1:f_name3', 'string', 'three')], 'id_col': 'eid'} 

Instances of entity type entity_type1 with eid ending in 1:
+----+-----------+-----------+-----------+
| eid|fg1:f_name1|fg1:f_name2|fg1:f_name3|
+----+-----------+-----------+-----------+
|eid1| 1| 2.0| three|
+----+-----------+-----------+-----------+

Instances of entity type entity_type2 with eid in ['eid2']:
+----+-----------+-----------+-----------+
| eid|fg1:f_name1|fg1:f_name2|fg1:f_name3|
+----+-----------+-----------+-----------+
|eid2| 10| 20.0| thirty|
+----+-----------+-----------+-----------+
# test save dataframe

from pyspark.sql import SparkSession, Row

data = [{"ID": 'eid_1', "fg2_feature_1": 1, "fg2_feature_2": 12.40},
{"ID": 'eid_2', "fg2_feature_1": 2, "fg2_feature_2": 30.10},
{"ID": 'eid_3', "fg2_feature_1": 3, "fg2_feature_2": 100.01}
]

# create and save a dataframe
df = spark.createDataFrame([Row(**i) for i in data])
df.show()
Entity.saveDF(df, "etype_1", "ID")

Output:

+-----+-------------+-------------+
| ID|fg2_feature_1|fg2_feature_2|
+-----+-------------+-------------+
|eid_1| 1| 12.4|
|eid_2| 2| 30.1|
|eid_3| 3| 100.01|
+-----+-------------+-------------+
# test query operation that returns a dataframe
# need to define a schema first
schema = StructType([
StructField('ID', StringType(), False),
StructField('fg2_feature_1', IntegerType(), True),
StructField('fg2_feature_2', DoubleType(), True)])
queryDF = Entity.query('etype_1', 'ID in ("eid_1", "eid_3")', schema, 'ID')
queryDF.show()

Output:

+-----+-------------+-------------+
| ID|fg2_feature_1|fg2_feature_2|
+-----+-------------+-------------+
|eid_1| 1| 12.4|
|eid_3| 3| 100.01|
+-----+-------------+-------------+

Dataset Implementation

# Deferred to the second notebook in this series on Model Training.

Using Feature Store

Let us now see how the feature store objects and operations can be leveraged through the various phases of the ML flow. In this notebook, the focus is on Feature Engineering. Future notebooks will look into Model Training and Model Serving scenarios.

Example: Credit Card Fraud Data

The demo data is abridged from its original version from here. It represents real transactions by European cardholders in 2013. The original dataset has close to 300K transactions, whereas the abdridged version used here contains about a thousand records.

We will illustrate use of the feature store for Feature Engineering through the following sequence:

  • Read the demo data from a csv file.
  • Perform feature engineering tasks.
  • Save the engineered features to the Feature Store.

Read Data into Dataframe

Read and examine the columns and rows.

The data contains transformed versions of PCA ("V1"-"V28") with 29 feature columns and 1 label ("Class") column.

import pandas as pd 
data=pd.read_csv("resources/creditcard_small.csv")
data.info()

Output:

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 984 entries, 0 to 983
Data columns (total 32 columns):
# Column Non-Null Count Dtype
--- ------ -------------- -----
0 Unnamed: 0 984 non-null int64
1 Time 984 non-null float64
2 V1 984 non-null float64
3 V2 984 non-null float64
4 V3 984 non-null float64
5 V4 984 non-null float64
6 V5 984 non-null float64
7 V6 984 non-null float64
8 V7 984 non-null float64
9 V8 984 non-null float64
10 V9 984 non-null float64
11 V10 984 non-null float64
12 V11 984 non-null float64
13 V12 984 non-null float64
14 V13 984 non-null float64
15 V14 984 non-null float64
16 V15 984 non-null float64
17 V16 984 non-null float64
18 V17 984 non-null float64
19 V18 984 non-null float64
20 V19 984 non-null float64
21 V20 984 non-null float64
22 V21 984 non-null float64
23 V22 984 non-null float64
24 V23 984 non-null float64
25 V24 984 non-null float64
26 V25 984 non-null float64
27 V26 984 non-null float64
28 V27 984 non-null float64
29 V28 984 non-null float64
30 Amount 984 non-null float64
31 Class 984 non-null int64
dtypes: float64(30), int64(2)
memory usage: 246.1 KB

The data contains all fraudulent transactions from the original dataset, and the same number of randomly selected non-fraudulent transactions.

total = len(data)
normal = len(data[data.Class == 0])
fraudulent = len(data[data.Class == 1])
fraud_percentage = round(fraudulent/total*100, 2)
print('Total number of transactions: {}'.format(total))
print('Number of normal transactions {}'.format(normal))
print('Number of fraudulent transactions: {}'.format(fraudulent))

Output:

Total number of transactions: 984
Number of normal transactions 492
Number of fraudulent transactions: 492

Here is how the data looks:

data.head()

Output:

Unnamed: 0TimeV1V2V3V4V5V6V7V8...V21V22V23V24V25V26V27V28AmountClass
012177.0-0.4271910.7457081.761811-0.1651300.058298-0.2134130.6473230.073464...-0.201681-0.4320700.0131640.161606-0.4013100.0474230.102549-0.1165719.120
1248296153875.0-0.6136963.698772-5.5349415.6204861.649263-2.335145-0.9071880.706362...0.319261-0.471379-0.075890-0.667909-0.6428480.0706000.4884100.2923450.001
2239160.01.1714390.4749740.0117611.2643030.116234-0.8659860.554393-0.276375...0.0700510.278843-0.0974910.4262780.744938-0.2747280.0084720.01549220.000
3239501150139.0-6.682832-2.714268-5.7745301.449792-0.661836-1.1486500.8496860.433427...0.2205261.1870130.3358210.2156830.8031100.044033-0.0549880.082337237.261
414333685285.0-6.7134073.921104-9.7466785.148263-5.151563-2.099389-5.9377673.578780...0.954272-0.4510860.127214-0.3394500.3940961.0752951.649906-0.394905252.921

5 rows × 32 columns

Perform Feature Engineering Tasks

We will perform some simple data transformations:

  • order the rows
  • rename the columns to include the feature group prefix "CC1"
  • add a transaction id column "TxnId" and assign a unique generated value to it
  • select (drop the index column) and order columns
# rename the index column from the orignal dataset
data = data.rename(columns={"Unnamed: 0": "OldIdx"})
# order the rows by timestamp (original index order)
data = data.sort_values("OldIdx")
data.reset_index(drop=True, inplace=True)
data

Output:

OldIdxTimeV1V2V3V4V5V6V7V8...V21V22V23V24V25V26V27V28AmountClass
000.0-1.359807-0.0727812.5363471.378155-0.3383210.4623880.2395990.098698...-0.0183070.277838-0.1104740.0669280.128539-0.1891150.133558-0.021053149.620
110.01.1918570.2661510.1664800.4481540.060018-0.082361-0.0788030.085102...-0.225775-0.6386720.101288-0.3398460.1671700.125895-0.0089830.0147242.690
221.0-1.358354-1.3401631.7732090.379780-0.5031981.8004990.7914610.247676...0.2479980.7716790.909412-0.689281-0.327642-0.139097-0.055353-0.059752378.660
331.0-0.966272-0.1852261.792993-0.863291-0.0103091.2472030.2376090.377436...-0.1083000.005274-0.190321-1.1755750.647376-0.2219290.0627230.061458123.500
442.0-1.1582330.8777371.5487180.403034-0.4071930.0959210.592941-0.270533...-0.0094310.798278-0.1374580.141267-0.2060100.5022920.2194220.21515369.990
..................................................................
979279863169142.0-1.9278831.125653-4.5183311.749293-1.566487-2.010494-0.8828500.697211...0.778584-0.3191890.639419-0.2948850.5375030.7883950.2926800.147968390.001
980280143169347.01.3785591.289381-5.0042471.4118500.442581-1.326536-1.4131700.248525...0.3706120.028234-0.145640-0.0810490.5218750.7394670.3891520.1866370.761
981280149169351.0-0.6761431.126366-2.2137000.468308-1.120541-0.003346-2.2347391.210158...0.7518260.8341080.1909440.032070-0.7396950.4711110.3851070.19436177.891
982281144169966.0-3.1138320.585864-5.3997301.817092-0.840618-2.943548-2.2080021.058733...0.583276-0.269209-0.456108-0.183659-0.3281680.6061160.884876-0.253700245.001
983281674170348.01.9919760.158476-2.5834410.4086701.151147-0.0966950.223050-0.068384...-0.164350-0.295135-0.072173-0.4502610.313267-0.2896170.002988-0.01530942.531

984 rows × 32 columns

# rename the columns to include the feature group prefix "CC1"
curr_columns = data.columns
data = data.rename(columns=dict(zip(curr_columns, ["CC1_"+c for c in curr_columns])))
data

Output:

CC1_OldIdxCC1_TimeCC1_V1CC1_V2CC1_V3CC1_V4CC1_V5CC1_V6CC1_V7CC1_V8...CC1_V21CC1_V22CC1_V23CC1_V24CC1_V25CC1_V26CC1_V27CC1_V28CC1_AmountCC1_Class
000.0-1.359807-0.0727812.5363471.378155-0.3383210.4623880.2395990.098698...-0.0183070.277838-0.1104740.0669280.128539-0.1891150.133558-0.021053149.620
110.01.1918570.2661510.1664800.4481540.060018-0.082361-0.0788030.085102...-0.225775-0.6386720.101288-0.3398460.1671700.125895-0.0089830.0147242.690
221.0-1.358354-1.3401631.7732090.379780-0.5031981.8004990.7914610.247676...0.2479980.7716790.909412-0.689281-0.327642-0.139097-0.055353-0.059752378.660
331.0-0.966272-0.1852261.792993-0.863291-0.0103091.2472030.2376090.377436...-0.1083000.005274-0.190321-1.1755750.647376-0.2219290.0627230.061458123.500
442.0-1.1582330.8777371.5487180.403034-0.4071930.0959210.592941-0.270533...-0.0094310.798278-0.1374580.141267-0.2060100.5022920.2194220.21515369.990
..................................................................
979279863169142.0-1.9278831.125653-4.5183311.749293-1.566487-2.010494-0.8828500.697211...0.778584-0.3191890.639419-0.2948850.5375030.7883950.2926800.147968390.001
980280143169347.01.3785591.289381-5.0042471.4118500.442581-1.326536-1.4131700.248525...0.3706120.028234-0.145640-0.0810490.5218750.7394670.3891520.1866370.761
981280149169351.0-0.6761431.126366-2.2137000.468308-1.120541-0.003346-2.2347391.210158...0.7518260.8341080.1909440.032070-0.7396950.4711110.3851070.19436177.891
982281144169966.0-3.1138320.585864-5.3997301.817092-0.840618-2.943548-2.2080021.058733...0.583276-0.269209-0.456108-0.183659-0.3281680.6061160.884876-0.253700245.001
983281674170348.01.9919760.158476-2.5834410.4086701.151147-0.0966950.223050-0.068384...-0.164350-0.295135-0.072173-0.4502610.313267-0.2896170.002988-0.01530942.531

984 rows × 32 columns

Create a Spark dataframe to save data to Aerospike efficiently through the Aerospike Spark Connector.

sparkDF = spark.createDataFrame(data) 
sparkDF.printSchema()
sparkDF.toPandas()

Output:

root
|-- CC1_OldIdx: long (nullable = true)
|-- CC1_Time: double (nullable = true)
|-- CC1_V1: double (nullable = true)
|-- CC1_V2: double (nullable = true)
|-- CC1_V3: double (nullable = true)
|-- CC1_V4: double (nullable = true)
|-- CC1_V5: double (nullable = true)
|-- CC1_V6: double (nullable = true)
|-- CC1_V7: double (nullable = true)
|-- CC1_V8: double (nullable = true)
|-- CC1_V9: double (nullable = true)
|-- CC1_V10: double (nullable = true)
|-- CC1_V11: double (nullable = true)
|-- CC1_V12: double (nullable = true)
|-- CC1_V13: double (nullable = true)
|-- CC1_V14: double (nullable = true)
|-- CC1_V15: double (nullable = true)
|-- CC1_V16: double (nullable = true)
|-- CC1_V17: double (nullable = true)
|-- CC1_V18: double (nullable = true)
|-- CC1_V19: double (nullable = true)
|-- CC1_V20: double (nullable = true)
|-- CC1_V21: double (nullable = true)
|-- CC1_V22: double (nullable = true)
|-- CC1_V23: double (nullable = true)
|-- CC1_V24: double (nullable = true)
|-- CC1_V25: double (nullable = true)
|-- CC1_V26: double (nullable = true)
|-- CC1_V27: double (nullable = true)
|-- CC1_V28: double (nullable = true)
|-- CC1_Amount: double (nullable = true)
|-- CC1_Class: long (nullable = true)

Output:

CC1_OldIdxCC1_TimeCC1_V1CC1_V2CC1_V3CC1_V4CC1_V5CC1_V6CC1_V7CC1_V8...CC1_V21CC1_V22CC1_V23CC1_V24CC1_V25CC1_V26CC1_V27CC1_V28CC1_AmountCC1_Class
000.0-1.359807-0.0727812.5363471.378155-0.3383210.4623880.2395990.098698...-0.0183070.277838-0.1104740.0669280.128539-0.1891150.133558-0.021053149.620
110.01.1918570.2661510.1664800.4481540.060018-0.082361-0.0788030.085102...-0.225775-0.6386720.101288-0.3398460.1671700.125895-0.0089830.0147242.690
221.0-1.358354-1.3401631.7732090.379780-0.5031981.8004990.7914610.247676...0.2479980.7716790.909412-0.689281-0.327642-0.139097-0.055353-0.059752378.660
331.0-0.966272-0.1852261.792993-0.863291-0.0103091.2472030.2376090.377436...-0.1083000.005274-0.190321-1.1755750.647376-0.2219290.0627230.061458123.500
442.0-1.1582330.8777371.5487180.403034-0.4071930.0959210.592941-0.270533...-0.0094310.798278-0.1374580.141267-0.2060100.5022920.2194220.21515369.990
..................................................................
979279863169142.0-1.9278831.125653-4.5183311.749293-1.566487-2.010494-0.8828500.697211...0.778584-0.3191890.639419-0.2948850.5375030.7883950.2926800.147968390.001
980280143169347.01.3785591.289381-5.0042471.4118500.442581-1.326536-1.4131700.248525...0.3706120.028234-0.145640-0.0810490.5218750.7394670.3891520.1866370.761
981280149169351.0-0.6761431.126366-2.2137000.468308-1.120541-0.003346-2.2347391.210158...0.7518260.8341080.1909440.032070-0.7396950.4711110.3851070.19436177.891
982281144169966.0-3.1138320.585864-5.3997301.817092-0.840618-2.943548-2.2080021.058733...0.583276-0.269209-0.456108-0.183659-0.3281680.6061160.884876-0.253700245.001
983281674170348.01.9919760.158476-2.5834410.4086701.151147-0.0966950.223050-0.068384...-0.164350-0.295135-0.072173-0.4502610.313267-0.2896170.002988-0.01530942.531

984 rows × 32 columns

Add an identifier column TxnId. Generate and assign a unique value to it.

# add a new column TxnId for transaction id
# define a udf to update the new TxnId column

import pyspark.sql.functions as F

curr_txn_id = 1
def get_txn_id():
global curr_txn_id
txn_id = str(curr_txn_id)
curr_txn_id += 1
return txn_id
txn_id_udf = F.UserDefinedFunction(get_txn_id, StringType())
sparkDF = sparkDF.withColumn('TxnId', txn_id_udf())

# select the needed columns
sparkDF = sparkDF.select(['TxnId','CC1_Class','CC1_Amount']+['CC1_V'+str(i) for i in range(1,29)])
sparkDF.show(3, truncate=3)

Output:

+-----+---------+----------+------+------+------+------+------+------+------+------+------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+
|TxnId|CC1_Class|CC1_Amount|CC1_V1|CC1_V2|CC1_V3|CC1_V4|CC1_V5|CC1_V6|CC1_V7|CC1_V8|CC1_V9|CC1_V10|CC1_V11|CC1_V12|CC1_V13|CC1_V14|CC1_V15|CC1_V16|CC1_V17|CC1_V18|CC1_V19|CC1_V20|CC1_V21|CC1_V22|CC1_V23|CC1_V24|CC1_V25|CC1_V26|CC1_V27|CC1_V28|
+-----+---------+----------+------+------+------+------+------+------+------+------+------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+
| 1| 0| 149| -1.| -0.| 2.5| 1.3| -0.| 0.4| 0.2| 0.0| 0.3| 0.0| -0.| -0.| -0.| -0.| 1.4| -0.| 0.2| 0.0| 0.4| 0.2| -0.| 0.2| -0.| 0.0| 0.1| -0.| 0.1| -0.|
| 2| 0| 2.6| 1.1| 0.2| 0.1| 0.4| 0.0| -0.| -0.| 0.0| -0.| -0.| 1.6| 1.0| 0.4| -0.| 0.6| 0.4| -0.| -0.| -0.| -0.| -0.| -0.| 0.1| -0.| 0.1| 0.1| -0.| 0.0|
| 3| 0| 378| -1.| -1.| 1.7| 0.3| -0.| 1.8| 0.7| 0.2| -1.| 0.2| 0.6| 0.0| 0.7| -0.| 2.3| -2.| 1.1| -0.| -2.| 0.5| 0.2| 0.7| 0.9| -0.| -0.| -0.| -0.| -0.|
+-----+---------+----------+------+------+------+------+------+------+------+------+------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+
only showing top 3 rows

Save Features to Feature Store

We are ready to save the features in the Spark dataframe to Aerospike.

Of the following three steps, the first two steps are needed only the first time when the feature group and features are created. Subsequent updates will use only the third step.

  1. Create a feature group.
  2. Create feature metadata.
  3. Save feature values in entity records.
# 1. Create a feature group.
FG_NAME = 'CC1'
FG_DESCRIPTION = 'Credit card transaction data'
FG_SOURCE = 'European cardholder dataset from Kaggle'
fg = FeatureGroup(FG_NAME, FG_DESCRIPTION, FG_SOURCE,
attrs={'entity':'cctxn', 'class':'fraud'}, tags=['kaggle', 'demo'])
fg.save()

# 2. Create feature metadata
FEATURE_AMOUNT = 'Amount'
f = Feature(FG_NAME, FEATURE_AMOUNT, 'double', "Transaction amount",
attrs={'entity':'cctxn'}, tags=['usd'])
f.save()
FEATURE_CLASS = 'Class'
f = Feature(FG_NAME, FEATURE_CLASS, 'integer', "Label indicating fraud or not",
attrs={'entity':'cctxn'}, tags=['label'])
f.save()
FEATURE_PCA_XFORM = "V"
for i in range(1,29):
f = Feature(FG_NAME, FEATURE_PCA_XFORM+str(i), 'double', "Transformed version of PCA",
attrs={'entity':'cctxn'}, tags=['pca'])
f.save()

# 3. Save feature values in entity records
ENTITY_TYPE = 'cctxn'
ID_COLUMN = 'TxnId'
Entity.saveDF(sparkDF, ENTITY_TYPE, ID_COLUMN)
print('Features stored to Feature Store.')

Output:

Features stored to Feature Store.

Query Features

Let's issue a query against the saved features to get fraudulent transactions with amount greater than $2000. There appears to be only one such transaction in the demo data.

schema = StructType([StructField(ID_COLUMN, StringType(), False),
StructField(FG_NAME+'_'+FEATURE_CLASS, IntegerType(), False),
StructField(FG_NAME+'_'+FEATURE_AMOUNT, DoubleType(), False)])
for i in range(1,29):
schema.add(FG_NAME+'_'+FEATURE_PCA_XFORM+str(i), DoubleType(), True)
queryDF = Entity.query(ENTITY_TYPE, 'CC1_Class == 1 and CC1_Amount > 2000', schema, ID_COLUMN)
queryDF.show(5, truncate=7)

Output:

+------+---------+----------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+
| TxnId|CC1_Class|CC1_Amount| CC1_V1| CC1_V2| CC1_V3| CC1_V4| CC1_V5| CC1_V6| CC1_V7| CC1_V8| CC1_V9|CC1_V10|CC1_V11|CC1_V12|CC1_V13|CC1_V14|CC1_V15|CC1_V16|CC1_V17|CC1_V18|CC1_V19|CC1_V20|CC1_V21|CC1_V22|CC1_V23|CC1_V24|CC1_V25|CC1_V26|CC1_V27|CC1_V28|
+------+---------+----------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+
|176050| 1| 2125.87|-2.0...|-7.1...|-4.0...|1.30...|-2.0...|-0.0...|2.88...|-0.7...|1.46...|-1.5...|-1.3...|-0.2...|-1.5...|1.07...|0.38...|-0.6...|0.09...|0.33...|0.05...|3.97...|1.24...|-1.0...|-1.8...|0.65...|-0.4...|-0.8...|-0.3...|0.31...|
+------+---------+----------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+

Examine the data through aql with the following command:

!aql -c "select * from test.cctxn-features where PK='176050'"

Output:

select * from test.cctxn-features where PK='176050'
+------------+-----------+-------------------+-------------------+-------------------+--------------------+-------------------+-----------------+-------------------+--------------------+--------------------+-------------------+--------------------+-------------------+------------------+------------------+-------------------+-------------------+-------------------+--------------------+--------------------+-------------------+-------------------+-------------------+------------------+-------------------+---------------------+------------------+--------------------+-----------------+----------+
| CC1_Amount | CC1_Class | CC1_V1 | CC1_V10 | CC1_V11 | CC1_V12 | CC1_V13 | CC1_V14 | CC1_V15 | CC1_V16 | CC1_V17 | CC1_V18 | CC1_V19 | CC1_V2 | CC1_V20 | CC1_V21 | CC1_V22 | CC1_V23 | CC1_V24 | CC1_V25 | CC1_V26 | CC1_V27 | CC1_V28 | CC1_V3 | CC1_V4 | CC1_V5 | CC1_V6 | CC1_V7 | CC1_V8 | CC1_V9 | TxnId |
+------------+-----------+-------------------+-------------------+-------------------+--------------------+-------------------+-----------------+-------------------+--------------------+--------------------+-------------------+--------------------+-------------------+------------------+------------------+-------------------+-------------------+-------------------+--------------------+--------------------+-------------------+-------------------+-------------------+------------------+-------------------+---------------------+------------------+--------------------+-----------------+----------+
| 2125.87 | 1 | -2.00345953080582 | -1.53160798206082 | -1.39432826167269 | -0.220718797789479 | -1.53099043146804 | 1.0752476539262 | 0.388383209307268 | -0.660655352312646 | 0.0933209955444861 | 0.335742221574637 | 0.0575510393537501 | -7.15904171709445 | 3.97321702726744 | 1.24428677489095 | -1.01523228673153 | -1.80098486605048 | 0.657585626965743 | -0.435617246752788 | -0.894508922176968 | -0.39755738695085 | 0.314261714087509 | -4.05097631587393 | 1.30957974749918 | -2.05810158798669 | -0.0986209270722274 | 2.88008272715204 | -0.727484046608914 | 1.4603805509699 | "176050" |
+------------+-----------+-------------------+-------------------+-------------------+--------------------+-------------------+-----------------+-------------------+--------------------+--------------------+-------------------+--------------------+-------------------+------------------+------------------+-------------------+-------------------+-------------------+--------------------+--------------------+-------------------+-------------------+-------------------+------------------+-------------------+---------------------+------------------+--------------------+-----------------+----------+
1 row in set (0.001 secs)

OK

Example: Salary Data

Let's look at another example of feature store use for data engineering, which is taken from another Aerospike Spark notebook.

We will illustrate use of the feature store for Feature Engineering through the following sequence:

  • Generate demo data.
  • Perform feature engineering tasks.
  • Save the engineered features to Feature Store.

Generate Demo Data

# We create age vs salary data, using three different Gaussian distributions
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import math

# Make sure we get the same results every time this workbook is run
# Otherwise we are occasionally exposed to results not working out as expected
np.random.seed(12345)

# Create covariance matrix from std devs + correlation
def covariance_matrix(std_dev_1,std_dev_2,correlation):
return [[std_dev_1 ** 2, correlation * std_dev_1 * std_dev_2],
[correlation * std_dev_1 * std_dev_2, std_dev_2 ** 2]]

# Return a bivariate sample given means/std dev/correlation
def age_salary_sample(distribution_params,sample_size):
mean = [distribution_params["age_mean"], distribution_params["salary_mean"]]
cov = covariance_matrix(distribution_params["age_std_dev"],distribution_params["salary_std_dev"],
distribution_params["age_salary_correlation"])
return np.random.multivariate_normal(mean, cov, sample_size).T

# Define the characteristics of our age/salary distribution
age_salary_distribution_1 = {"age_mean":25,"salary_mean":50000,
"age_std_dev":1,"salary_std_dev":5000,"age_salary_correlation":0.3}

age_salary_distribution_2 = {"age_mean":45,"salary_mean":80000,
"age_std_dev":4,"salary_std_dev":8000,"age_salary_correlation":0.7}

age_salary_distribution_3 = {"age_mean":35,"salary_mean":70000,
"age_std_dev":2,"salary_std_dev":9000,"age_salary_correlation":0.1}

distribution_data = [age_salary_distribution_1,age_salary_distribution_2,age_salary_distribution_3]

# Sample age/salary data for each distributions
sample_size_1 = 100;
sample_size_2 = 120;
sample_size_3 = 80;
sample_sizes = [sample_size_1,sample_size_2,sample_size_3]
group_1_ages,group_1_salaries = age_salary_sample(age_salary_distribution_1,sample_size=sample_size_1)
group_2_ages,group_2_salaries = age_salary_sample(age_salary_distribution_2,sample_size=sample_size_2)
group_3_ages,group_3_salaries = age_salary_sample(age_salary_distribution_3,sample_size=sample_size_3)

ages=np.concatenate([group_1_ages,group_2_ages,group_3_ages])
salaries=np.concatenate([group_1_salaries,group_2_salaries,group_3_salaries])

print("Data created")

Output:

Data created

Perform Feature Engineering Tasks

Display simulated age/salary data.

# Plot the sample data
group_1_colour, group_2_colour, group_3_colour ='red','blue', 'pink'
plt.xlabel('Age',fontsize=10)
plt.ylabel("Salary",fontsize=10)

plt.scatter(group_1_ages,group_1_salaries,c=group_1_colour,label="Group 1")
plt.scatter(group_2_ages,group_2_salaries,c=group_2_colour,label="Group 2")
plt.scatter(group_3_ages,group_3_salaries,c=group_3_colour,label="Group 3")

plt.legend(loc='upper left')
plt.show()

Output:

png

# Turn the above records into a Data Frame
# First of all, create an array of arrays
inputBuf = []

for i in range(0, len(ages)) :
id = i + 1 # Avoid counting from zero
name = "Individual: {:03d}".format(id)
# Note we need to make sure values are typed correctly
# salary will have type numpy.float64 - if it is not cast as below, an error will be thrown
age = float(ages[i])
salary = int(salaries[i])
inputBuf.append((id, name,age,salary))

# Convert to an RDD
inputRDD = spark.sparkContext.parallelize(inputBuf)

# Convert to a data frame using a schema
# Note the feature group SAL is prefixed to each feature column
schema = StructType([
StructField("id", IntegerType(), True),
StructField("SAL_name", StringType(), True),
StructField("SAL_age", DoubleType(), True),
StructField("SAL_salary",IntegerType(), True)
])

inputDF=spark.createDataFrame(inputRDD,schema)
inputDF.show(5)

Output:

+---+---------------+------------------+----------+
| id| SAL_name| SAL_age|SAL_salary|
+---+---------------+------------------+----------+
| 1|Individual: 001| 25.39547052370498| 48976|
| 2|Individual: 002|24.314035458986748| 47402|
| 3|Individual: 003|26.918958635987888| 59828|
| 4|Individual: 004| 25.29664106310324| 50464|
| 5|Individual: 005|26.419729731447458| 53845|
+---+---------------+------------------+----------+
only showing top 5 rows

Save Features to Feature Store

  1. Create a feature group.
  2. Create feature metadata.
  3. Save feature values in entity records.
# 1. Create a feature group.
FG_NAME = 'SAL'
FG_DESCRIPTION = 'Age salary data'
FG_SOURCE = 'Generated demo data'
fg = FeatureGroup(FG_NAME, FG_DESCRIPTION, FG_SOURCE,
attrs={'access':'all'}, tags=['test'])
fg.save()

# 2. Create features metadata.
FEATURE_NAME = 'name'
f = Feature(FG_NAME, FEATURE_NAME, "string", "Name of the person",
attrs={'unique':'no'}, tags=['test'])
f.save()

FEATURE_AGE = 'age'
f = Feature(FG_NAME, FEATURE_AGE, "double", "Age of the person",
attrs={'range':'0-100'}, tags=['test'])
f.save()

FEATURE_SALARY = 'salary'
f = Feature(FG_NAME, FEATURE_SALARY, 'integer', "Salary of the person",
attrs={'range':'20-999K'}, tags=['test'])
f.save()

# 3. Save feature values in entity records.
ENTITY_TYPE = 'user'
ID_COLUMN = 'id'
Entity.saveDF(inputDF, ENTITY_TYPE, ID_COLUMN)
print('Features stored to Feature Store.')

Output:

Features stored to Feature Store.

Query Features

Issue a query against the saved features to get a dataframe with records in age group 25 to 30. There are 50 such users in the dataset.

queryDF = Entity.query(ENTITY_TYPE, '`SAL_age` between 25 and 30', schema, ID_COLUMN)
print("Number of users of age between 25 and 30: {}".format(queryDF.count()))
queryDF.show(5)

Output:

Number of users of age between 25 and 30: 50
+---+---------------+------------------+----------+
| id| SAL_name| SAL_age|SAL_salary|
+---+---------------+------------------+----------+
| 85|Individual: 085|26.288287033377657| 59603|
| 1|Individual: 001| 25.39547052370498| 48976|
| 14|Individual: 014| 25.59043077849547| 51513|
| 79|Individual: 079|25.887490702675926| 48162|
| 4|Individual: 004| 25.29664106310324| 50464|
+---+---------------+------------------+----------+
only showing top 5 rows

Examine the data through aql with the following command:

!aql -c "select * from test.user-features where PK=85"

Output:

select * from test.user-features where PK=85
+----+-------------------+-------------------+------------+
| id | SAL_age | SAL_name | SAL_salary |
+----+-------------------+-------------------+------------+
| 85 | 26.28828703337766 | "Individual: 085" | 59603 |
+----+-------------------+-------------------+------------+
1 row in set (0.001 secs)

OK

Takeaways and Conclusion

In this notebook, we explored how Aerospike can be used as a Feature Store for ML applications. Specifically, we showed how features engineered using the Spark platform can be efficiently stored in Aerospike feature store via the Aerospike Spark Connector. We implemented a simple example feature store interface that leverages the Aerospike Spark connector capabilities for this purpose. We used the API to save and query features created in two data engineering examples.

This is a first in the series of notebooks on how Aerospike can be used as a feature store. Subsequent notebooks in this series will explore use of Aerospike Feature Store for Model Training and Model Serving.

Cleaning Up

Close the spark session, and remove the tutorial data by executing the cell below.

try:
spark.stop()
except:
; # ignore
# To remove all data in the namespace test, uncomment the following line and run:
#!aql -c "truncate test"

Further Exploration and Resources

Here are some links for further exploration.

Resources

Exploring Other Notebooks

Visit Aerospike notebooks repo to run additional Aerospike notebooks. To run a different notebook, download the notebook from the repo to your local machine, and then click on File->Open in the notebook menu, and select Upload.