aerolookup
aerolookup
allows rapid lookups on records that correspond to a set of keys that are loaded into a Spark DataFrame (DF), either streaming or otherwise.
It is easy to use and is also available in the Python, Scala, and Java client libraries.
There are also performance gains attributed to Aerospike’s internal batch-read calls.
Follow the tutorial "Reading from and Writing to DataFrames" to load the records of an Aerospike set into a Spark DataFrame.
/***
* aerolookup API uses the contents of a spark dataFrame column to perform large scale lookup
* in the Aerospike database. Columns specified in `outputSchema` are used to retrieve relevant
* bins from the Aerospike database and construct columns of the resultant Dataframe. All properties
* related to batchget read can be passed as `inputConf`. We merge the contents of `inputConf` with
* Aerospike properties present in dataFrame's Spark session runtime properties, to populate Aerospike
* specific properties for read operation. Aerospike specific properties such as `aerospike.seedhost` or
* `aerospike.transaction.rate` can be passed through this map. To set
* metacolumns like `aerospike.keyColumn` or `aerospike.digestColumn`, they must be present in `outputSchema`
* and should be passed in `inputConf`. This is necessary because the connector needs to know
* which columns in the output schema should be set with the metadata values retrieved from database.
*/
@param dataFrame - spark dataframe can be streaming or static.
@param keyCol - column of the dataframe used to construct primary key for the lookup.
@param set - set name in the database.
@param outputSchema - schema of the resultant dataframe. This schema populates
fields in the resultant dataframe.
@param namespace - namespace of the `set`.
@param inputConf - map of properties. These properties are merged with aerospike properties
extracted from spark runtime config.
@return Dataframe - resultant dataframe after aerolookup.
//Scala
aerolookup(dataFrame: DataFrame,
keyCol: String,
set: String,
outputSchema: StructType,
namespace: String): DataFrame,
inputConf: Map[String, Any] = Map[String, Any]()
In the Python version of the API, outputSchema is the JSON schema of the desired DataFrame. The rest of the parameters have the same type as the aforementioned Scala API.
The following example illustrates how to invoke aerolookup
in Python applications.
#Python
#sc is SparkContext, spark is SparkSession object
#import the package
scala_object= sc._jvm.com.aerospike.spark.PythonUtil
#get java object representing the dataframe
gateway_df=scala_object.aerolookup(dataframe._jdf, keyCol, set, outputSchemaAsJson, namespace, {})
#convert the java object to python dataframe
aerolookup_df=pyspark.sql.DataFrame(gateway_df, spark._wrapped)
Here's an example of how to use the aerolookup
function. First, we save some sample data in the Aerospike database:
import com.aerospike.spark._
val setName= "aerolookup"
val writeSchema= StructType(Seq(
StructField("col1", IntegerType, false),
StructField("col2", StringType, false),
StructField("col3", LongType, false)
))
val inputDF = {
val inputBuf= new ArrayBuffer[Row]()
for (i <- 1 to 10000){
val col1 = i
val col2 = i.toString
val r = Row(col1,col2, 2*i.toLong)
inputBuf.append(r)
}
val inputRDD = sc.parallelize(inputBuf)
session.createDataFrame(inputRDD,writeSchema)
}
inputDF.write
.mode(SaveMode.Append)
.format("aerospike")
.option("aerospike.writeset", setName)
.option("aerospike.updateByKey", "col1")
.save()
Next, we look up records in the namespace test
and set aerolookup
by using DataFrame dfInSparkMemory
column col1
.
//schema of the output dataframe
val outputSchema= StructType(Seq(
StructField("col2", StringType, false),
StructField("col3", LongType, false)))
import spark.implicits._
val keys = 1 to 1000
val dfInSparkMemory: DataFrame = keys.toDF("col1")
val resultantDF=aerolookup(dfInSparkMemory,"col1", setName,outputSchema, "test")
You can also set other read related flags such as aerospike.pushdown.expressions
, aerospike.transaction.rate
and aerospike.schema.flexible
as key-value pairs in the SparkConf object so that they apply to the aerolookup
API.
Stream processing using aerolookup
spark.conf.set("aerospike.seedhost", "127.0.0.1")
val simpleschema: StructType = new StructType(
Array(
StructField("id", IntegerType, nullable = false),
StructField("rate_code_id", LongType, nullable = true)
))
//streaming source
val streamreader = spark
.readStream
.schema(simpleschema)
.format(allParams.getOrElse("readformat", "csv"))
.load("s3-bucket-receiving-streaming-data")
//stream processing using aerolookup
val streamWriter = streamreader.writeStream.foreachBatch((outputDf: DataFrame, bid: Long) => {
val lookupDF = outputDf.select("id", "rate_code_id").where("id is not null")
.groupBy("id").agg(sum("rate_code_id").alias("rate_code_id"))
val aerospikeBatchReadDf = aerolookup(
lookupDF, // streaming dataframe
"id", // key column of of the streaming dataframe to be used for lookup into db
"nyc", //set name
simpleschema, //schema of the output dataframe
"test", //namespace
//config map, note aerospike.keyColumn needed to be set if you want
//to use lookup key data present in output dataframe
Map("aerospike.log.level" -> "info", "aerospike.timeout"-> 864000,
"aerospike.sockettimeout" -> 864000, "aerospike.keyColumn"-> "id")
)
//do some processing
val lookupRdd = lookupDF.rdd.map(row => {
if (Option(row.get(0)).isEmpty || Option(row.get(1)).isEmpty) {
logWarning(s"lookupDF row with null : ${row.get(0)} row 2: ${row.get(1)}")
}
(row.getInt(0), row.getLong(1))
})
val aerospikeBatchReadRDD = aerospikeBatchReadDf.rdd
val aerospikeRdd = aerospikeBatchReadRDD.map(row => {
if(Option(row.get(0)).isEmpty || Option(row.get(1)).isEmpty){
logWarning(s"aerospikeRdd row with null : ${row.get(0)} row 2: ${row.get(1)}")
}
(row.getInt(0), row.getLong(1))
})
val finalRdd = lookupRdd
.fullOuterJoin(aerospikeRdd)
.map(row => {
def getIfNotNull(data: Option[Long]): Long = {
if (data != None && data != null) {
val d = data.asInstanceOf[Number].intValue()
d
} else {
0
}
}
val key = row._1
val impressions_1: Long = row._2._1.getOrElse(0L)
val impressions_2 : Long = row._2._2.getOrElse(0L)
val impressions = impressions_1 + impressions_2
Row(key, impressions)
})
//write the processed data to the DB
val aerospikeWriteDf = spark.createDataFrame(finalRdd, simpleschema)
aerospikeWriteDf.write.mode(SaveMode.Overwrite)
.format("aerospike")
.option("aerospike.writeset", "aerospike_spark_agg")
.option("aerospike.updateByKey", "id")
.option("aerospike.sendKey", "true")
.save()
}).start()