Implementing SQL Operations: SELECT
For an interactive Jupyter notebook experience:
This tutorial describes how to implement certain SQL SELECT statements in Aerospike.
This notebook requires the Aerospike Database running locally with Java kernel and Aerospike Java Client. To create a Docker container that satisfies the requirements and holds a copy of Aerospike notebooks, visit the Aerospike Notebooks Repo.
Introduction
In this notebook, we will see how specific SELECT statements in SQL can be implemented in Aerospike.
SQL is a widely known data access language. If you have used SQL, the examples in this notebook will make it easier to implement specific SQL SELECT statements.
This notebook is the first in the SQL Operations series that consists of the following notebooks:
- Implementing SQL Operations: SELECT (this notebook)
- Implementing SQL Operations: UPDATE, CREATE, and DELETE
- Implementing SQL Operations: Aggregates
The specific topics and SQL SELECT statements we discuss include:
- Components of the SELECT statement
- Single record retrieval
- Batch retrieval
- Predicate based retrieval
- Query operation using an index
- Query operation using an expression filter
- Scan operation using an expression filter
- Computed fields
The purpose of this notebook is to provide Aerospike API equivalents for specific SQL operations. Not all SQL operations (such as JOIN) are directly supported in Aerospike API. The SQL syntax in this notebook while largely accurate is meant only to convey the semantics of the operations. Check out Aerospike Presto Connector for ad-hoc SQL access to Aerospike data.
While Aerospike provides both synchronous and asynchronous execution modes for many operations, we use mostly synchronous execution examples in this notebook, leaving asynchronous execution as a separate topic for a future tutorial.
Prerequisites
This tutorial assumes familiarity with the following topics:
Initialization
Ensure database is running
This notebook requires that Aerospike database is running.
import io.github.spencerpark.ijava.IJava;
import io.github.spencerpark.jupyter.kernel.magic.common.Shell;
IJava.getKernelInstance().getMagics().registerMagics(Shell.class);
%sh asd
Download and install additional components.
Install the Java client.
%%loadFromPOM
<dependencies>
<dependency>
<groupId>com.aerospike</groupId>
<artifactId>aerospike-client</artifactId>
<version>5.0.0</version>
</dependency>
</dependencies>
Connect to database and populate test data
The test data has ten records with user-key "id-1" through "id-10", two integer bins (fields) "bin1" and "bin2", in the namespace "test" and sets "sql-select-small"and null, and similarly structured 1000 records in set "sql-select-large".
import com.aerospike.client.AerospikeClient;
import com.aerospike.client.Bin;
import com.aerospike.client.Key;
import com.aerospike.client.policy.WritePolicy;
AerospikeClient client = new AerospikeClient("localhost", 3000);
System.out.println("Initialized the client and connected to the cluster.");
String Namespace = "test";
String SmallSet = "sql-select-small";
String LargeSet = "sql-select-large";
String NullSet = "";
WritePolicy wpolicy = new WritePolicy();
wpolicy.sendKey = true;
for (int i = 1; i <= 10; i++) {
Key key = new Key(Namespace, SmallSet, "id-"+i);
Bin bin1 = new Bin(new String("bin1"), i);
Bin bin2 = new Bin(new String("bin2"), 1000+i);
client.put(wpolicy, key, bin1, bin2);
}
for (int i = 1; i <= 10; i++) {
Key key = new Key(Namespace, NullSet, "id-"+i);
Bin bin1 = new Bin(new String("bin1"), i);
Bin bin2 = new Bin(new String("bin2"), 1000+i);
client.put(wpolicy, key, bin1, bin2);
}
for (int i = 1; i <= 1000; i++) {
Key key = new Key(Namespace, LargeSet, "id-"+i);
Bin bin1 = new Bin(new String("bin1"), i);
Bin bin2 = new Bin(new String("bin2"), 1000+i);
client.put(wpolicy, key, bin1, bin2);
}
System.out.format("Test data populated");;
Output:
Initialized the client and connected to the cluster.
Test data populated
Mapping Components of SELECT Statement
Columns and tables
In Aerospike, a relational database or schema maps to a namespace, a
table maps to a set, and a column maps to a bin. Thus a query
SELECT columns FROM table WHERE condition
can be written in Aerospike
terminology as SELECT bins FROM namespace.set WHERE condition
.
Record id
Records are stored in a namespace, organized in sets, and each record is uniquely identified by a key or id of the record that consists of a triple: (namespace, set, user-key) where user-key is a unique user specified id within the set. The key is closely identified with a record, and can be seen either as a metadata or a primary key field, and is returned in all retrieval APIs.
Record metadata
Each record has generation (or version) and expiration (or time-to-live in seconds) associated with it. This metadata is returned in all retrieval operations. It is possible to retrieve only the metadata without the record's bins through "getHeader" operation explained below.
A note on Policy
All APIs take a Policy argument. A policy specifies many request parameters such as timeout and maximum retries, as well as operations modifiers such as an expression filter.
Single Record Get
Let's start with a simple example of a single record retrieval using its key. You can either get the entire record or specific bins.
SELECT * FROM namespace.set WHERE id = key
Record Client::get(Policy policy, Key key)
SELECT bins FROM namsepace.set WHERE id = key
Record Client::get(Policy policy, Key key, String... binNames)
import com.aerospike.client.Record;
// Read all bins of the record with user-key "id-3" from the small set
Key key = new Key(Namespace, SmallSet, "id-3");
// retrieve all bins in the record
Record record = client.get(null, key);
System.out.format("set='%s' key=%s bins=%s\n", key.setName, key.userKey, record.bins);
// Read "bin2" for the record with user-key "id-4" from the null set
Key key = new Key(Namespace, NullSet, "id-4");
// retrieve only "bin2"
Record record = client.get(null, key, "bin2");
System.out.format("set='%s' key=%s bins=%s\n", key.setName, key.userKey, record.bins);;
Output:
set='sql-select-small' key=id-3 bins={bin1=3, bin2=1003}
set='' key=id-4 bins={bin2=1004}
Single Record Existence
There is a variant of single record retrieval to check a record's existence.
SELECT EXISTS(SELECT * FROM namespace.set WHERE id = key)
boolean Client::exists(Policy policy, Key key)
// Check existence of the record with user-key "id-3" in the small set
Key key = new Key(Namespace, SmallSet, "id-3");
boolean yesNo = client.exists(null, key);
System.out.format("key=%s exists=%s\n", key.userKey, yesNo);
// Check existence of the record with user-key "no-such-key" in the small set
Key key = new Key(Namespace, SmallSet, "no-such-key");
boolean yesNo = client.exists(null, key);
System.out.format("key=%s exists=%s\n", key.userKey, yesNo);;
Output:
key=id-3 exists=true
key=no-such-key exists=false
Single Record Metadata
It is possible to only obtain a record's header info or metadata, consisting of generation (or version) and expiration time (time-to-live in seconds).
SELECT generation, expiration FROM namespace.set WHERE id = key
Record Client::getHeader(Policy policy, Key key)
// Get the metadata for the record with user-key "id-3" in the small set
Key key = new Key(Namespace, SmallSet, "id-3");
Record header = client.getHeader(null, key);
System.out.format("key=%s generation=%s expiration=%s\n", key.userKey, header.generation, header.expiration);;
Output:
key=id-3 generation=1 expiration=355535130
Batch Retrieval
A batch operation operates on a list of records identified by the keys provided. This works similar to a single record retrieval, except multiple records are returned.
SELECT * FROM namespace.set WHERE id IN key-list
Record[] Client::get(BatchPolicy policy, Key[] keys)
SELECT bins FROM namespace.set WHERE id in key-list
Record[] Client::get(BatchPolicy policy, Key[] keys, String... binNames)
// Records with user-keys "id-0" through "id-3" from the small set
Key[] keys = new Key[4];
for (int i = 0; i < 4; i++) {
keys[i] = new Key(Namespace, SmallSet, "id-"+i);
}
// Get "bin1" and "bin2"
Record[] records = client.get(null, keys, "bin1", "bin2");
System.out.format("Batch retrieval results:\n");
for (int i = 0; i < records.length; i++) {
Key key = keys[i];
Record record = records[i];
if (record != null) {
System.out.format("key=%s bins=%s\n", key.userKey, record.bins);
}
else {
System.out.format("Key not found: key=%s\n", key.userKey);
}
}
Output:
Batch retrieval results:
Key not found: key=id-0
key=id-1 bins={bin1=1, bin2=1001}
key=id-2 bins={bin1=2, bin2=1002}
key=id-3 bins={bin1=3, bin2=1003}
Record Existence
There is a variant of batch retrieval to check record existence.
SELECT id, EXISTS(SELECT * FROM namespace.set WHERE id = key) WHERE key IN key-list
boolean[] Client::exists(Policy policy, Key[] keys)
// Check existence of records with user-keys "id-0" through "id-3" in the small set
Key[] keys = new Key[4];
for (int i = 0; i < 4; i++) {
keys[i] = new Key(Namespace, SmallSet, "id-"+i);
}
boolean yesNo[] = client.exists(null, keys);
System.out.format("Batch exists results:\n");
for (int i = 0; i < 4; i++) {
System.out.format("key=%s exists=%s\n", keys[i].userKey, yesNo[i]);
}
Output:
Batch exists results:
key=id-0 exists=false
key=id-1 exists=true
key=id-2 exists=true
key=id-3 exists=true
Record Metadata
It is possible to obtain header info or metadata consisting of generation (or version) and expiration time (time-to-live in seconds) for a specified set of records.
SELECT generation, expiration FROM namespace.set WHERE id IN key-list
Record[] Client::getHeader(Policy policy, Key[] keys)
// Header info for records with user-keys "id-0" through "id-3" in the small set
Key[] keys = new Key[4];
for (int i = 0; i < 4; i++) {
keys[i] = new Key(Namespace, SmallSet, "id-"+i);
}
Record headers[] = client.getHeader(null, keys);
System.out.format("Batch metadata results:\n");
for (int i = 0; i < 4; i++) {
if (headers[i] != null) {
System.out.format("key=%s generation=%s expiration=%s\n", keys[i].userKey,
headers[i].generation, headers[i].expiration);
}
else {
System.out.format("Key not found: key=%s\n", keys[i].userKey);
}
}
Output:
Batch metadata results:
Key not found: key=id-0
key=id-1 generation=1 expiration=355535130
key=id-2 generation=1 expiration=355535130
key=id-3 generation=1 expiration=355535130
Union of Batch Retrievals
A more general form of batch reads is also available that provides a union of simple batch results with different namespace, set, and bin specification. It populates the argument "records" on return.
(SELECT bins1 FROM namespace1.set1 WHERE id IN key-list1) UNION (SELECT bins2 FROM namespace2.set2 WHERE id IN key-list2) UNION ...
void Client::get(BatchPolicy policy, List<BatchRead> records)
import java.util.List;
import com.aerospike.client.BatchRead;
// Array of records to specify batch parameters and get back results
List<BatchRead> records = new ArrayList<BatchRead>();
// bin2 of record id-1 from the small set
records.add(new BatchRead(new Key(Namespace, SmallSet, "id-" + 1), new String[]{"bin2"}));
// All bins of record id-1 from the null set
records.add(new BatchRead(new Key(Namespace, NullSet, "id-" + 1), true));
// No data bins (only headers) of record id-2 from the small set
records.add(new BatchRead(new Key(Namespace, SmallSet, "id-" + 2), false));
// This record should be found, but the requested bin will not be found.
records.add(new BatchRead(new Key(Namespace, SmallSet, "id-" + 3), new String[]{"no-such-bin"}));
// This record should not be found.
records.add(new BatchRead(new Key(Namespace, SmallSet, "no-such-key"), true));
// Execute batch. Note, records array is populated on return.
client.get(null, records);
System.out.format("Union of multi batch results:\n");
for (BatchRead record : records) {
Key key = record.key;
Record rec = record.record;
if (rec != null) {
System.out.format("set='%s' key=%s bins=%s\n", key.setName, key.userKey, rec.bins);
}
else {
System.out.format("Key not found: set='%s' key=%s\n", key.setName, key.userKey);
}
}
Output:
Union of multi batch results:
set='sql-select-small' key=id-1 bins={bin2=1001}
set='' key=id-1 bins={bin1=1, bin2=1001}
set='sql-select-small' key=id-2 bins=null
set='sql-select-small' key=id-3 bins=null
Key not found: set='sql-select-small' key=no-such-key
Predicate Based Retrieval
In these operations, records matching a general predicate (or a condition) are retrieved.
SELECT bins FROM namespace.set WHERE condition
There are multiple ways of performing this SQL query in Aerospike. They involve query and scan operations.
- Query operation using an index and/or expression filter
- Scan operation using an expression filter
The query operation must be used when an index is involved, but may be used without an index. The scan operation can only be used without an index.
Query Based on Index
In SQL, an index if applicable is used automatically. In Aerospike, one must know the index and specify it explicitly in the statement argument in a query operation.
Record[] query(QueryPolicy policy, Statement statement)
Create a secondary index
To use the query API with index based filter, a secondary index must exist on the filter bin. Here we create a numeric index on "bin1" in "sql-select-small" set.
import com.aerospike.client.policy.Policy;
import com.aerospike.client.query.IndexType;
import com.aerospike.client.task.IndexTask;
import com.aerospike.client.AerospikeException;
import com.aerospike.client.ResultCode;
String IndexName = "test_small_bin1_number_idx";
Policy policy = new Policy();
policy.socketTimeout = 0; // Do not timeout on index create.
try {
IndexTask task = client.createIndex(policy, Namespace, SmallSet, IndexName,
"bin1", IndexType.NUMERIC);
task.waitTillComplete();
}
catch (AerospikeException ae) {
if (ae.getResultCode() != ResultCode.INDEX_ALREADY_EXISTS) {
throw ae;
}
}
System.out.format("Created index %s on ns=%s set=%s bin=%s.",
IndexName, Namespace, SmallSet, "bin1");;
Output:
Created index test_small_bin1_number_idx on ns=test set=sql-select-small bin=bin1.
Run the Query Based on Index
import com.aerospike.client.query.Statement;
import com.aerospike.client.query.Filter;
import com.aerospike.client.Value;
import com.aerospike.client.query.RecordSet;
import com.aerospike.client.Record;
import com.aerospike.client.policy.QueryPolicy;
import com.aerospike.client.exp.Exp;
Statement stmt = new Statement();
stmt.setNamespace(Namespace);
stmt.setSetName(SmallSet);
stmt.setFilter(Filter.range("bin1", 1, 3));
RecordSet rs = client.query(null, stmt);
System.out.format("Query based on bin1 index results\n");
while (rs.next()) {
Key key = rs.getKey();
Record record = rs.getRecord();
System.out.format("key=%s bins=%s\n", key.userKey, record.bins);
}
rs.close();
Output:
Query based on bin1 index results
key=id-1 bins={bin1=1, bin2=1001}
key=id-2 bins={bin1=2, bin2=1002}
key=id-3 bins={bin1=3, bin2=1003}
Query Based on Expression Filter
A general condition is specified as an expression filter, which does not use any underlying index. A synchronous call returns an array of records.
Record[] query(QueryPolicy policy, Statement statement)
Statement stmt = new Statement();
stmt.setNamespace(Namespace);
stmt.setSetName(SmallSet);
// expression filter 1001 <= bin2 <= 1003 is specified in the operation policy
QueryPolicy policy = new QueryPolicy(client.queryPolicyDefault);
policy.filterExp = Exp.build(
Exp.and(
Exp.ge(Exp.intBin("bin2"), Exp.val(1001)),
Exp.le(Exp.intBin("bin2"), Exp.val(1003))));
RecordSet rs = client.query(policy, stmt);
System.out.format("Query based on expression results\n");
while (rs.next()) {
Key key = rs.getKey();
Record record = rs.getRecord();
System.out.format("key=%s bins=%s\n", key.userKey, record.bins);
}
rs.close();
Output:
Query based on expression results
key=id-2 bins={bin1=2, bin2=1002}
key=id-3 bins={bin1=3, bin2=1003}
key=id-1 bins={bin1=1, bin2=1001}
Scan Based on Expression Filter
The scan operation takes a callback object which is called for every record in the result.
void scanAll(ScanPolicy policy, String namespace, String setName, ScanCallback callback, String... binNames)
import com.aerospike.client.ScanCallback;
import com.aerospike.client.policy.ScanPolicy;
public class ScanParallel implements ScanCallback {
public void scanCallback(Key key, Record record) {
System.out.format("key=%s bins=%s\n", key.userKey, record.bins);
}
}
// expression filter 1 <= bin1 <= 3 is specified in the operation policy
ScanPolicy policy = new ScanPolicy();
policy.filterExp = Exp.build(
Exp.and(
Exp.ge(Exp.intBin("bin1"), Exp.val(1)),
Exp.le(Exp.intBin("bin1"), Exp.val(3))));
System.out.format("Scan with expression filter results:\n");
client.scanAll(policy, Namespace, SmallSet, new ScanParallel());
Output:
Scan with expression filter results:
key=id-2 bins={bin1=2, bin2=1002}
key=id-3 bins={bin1=3, bin2=1003}
key=id-1 bins={bin1=1, bin2=1001}
Key Points
Here are some key points to remember about query and scan operations in Aerospike.
- To leverage an index, one must use a query operation.
- A query takes either or both: an index predicate and an expression filter.
- An expression filter may be used instead of an index predicate, but it will not perform as well.
- When only an expression filter is needed, either a query or a scan may be used (as shown above).
- A null set value when an index predicate is used works on the null set (records belonging to no set), but without an index predicate works on the entire namespace.
- An expression filter is specified within the policy, and is applied generally for filtering records beyond query and scan. You can find examples of this outside of this tutorial.
Some of these are illustrated with examples in the following cells.
To leverage an index, one must use a query operation.
If an index predicate is used on an unindexed bin, it results in an error.
Statement stmt = new Statement();
stmt.setNamespace(Namespace);
stmt.setSetName(SmallSet);
// try to use an index predicate on bin2 which has no index
stmt.setFilter(Filter.range("bin2", 1004, 1007));
try {
RecordSet rs = client.query(null, stmt);
System.out.format("Query with index predicate on unindexed bin results:\n");
while (rs.next()) {
Key key = rs.getKey();
Record record = rs.getRecord();
System.out.format("key=%s bin1=%s bin2=%s\n", key.userKey, record.bins);
}
rs.close();
}
catch (AerospikeException ae) {
System.out.format("Error: %s", ae);
}
Output:
Query with index predicate on unindexed bin results:
Error: com.aerospike.client.AerospikeException: Error 201,1,30000,0,5,BB9020011AC4202 127.0.0.1 3000: Index not found
A query takes either or both: an index predicate and an expression filter.
Below is an example of using both of them in a query. When both are specified, the two are ANDed.
Statement stmt = new Statement();
stmt.setNamespace(Namespace);
stmt.setSetName(SmallSet);
// index predicate to get bin1 in range 1 and 3
stmt.setFilter(Filter.range("bin1", 1, 3));
// expression filter to get bin1 in range 2 and 4
QueryPolicy policy = new QueryPolicy(client.queryPolicyDefault);
policy.filterExp = Exp.build(
Exp.and(
Exp.ge(Exp.intBin("bin1"), Exp.val(2)),
Exp.le(Exp.intBin("bin1"), Exp.val(4))));
RecordSet rs = client.query(policy, stmt);
System.out.format("Query based on index predicate and expression results\n");
while (rs.next()) {
Key key = rs.getKey();
Record record = rs.getRecord();
System.out.format("key=%s bins=%s\n", key.userKey, record.bins);
}
rs.close();
Output:
Query based on index predicate and expression results
key=id-2 bins={bin1=2, bin2=1002}
key=id-3 bins={bin1=3, bin2=1003}
A null set value when an index predicate is used works records belonging to no (null) set, but without an index predicate works on the entire namespace.
The scope of an index is a set. An index must exist on the null set when an index predicate is used with the null set.
// query with a null set
Statement stmt = new Statement();
stmt.setNamespace(Namespace);
stmt.setSetName(NullSet);
// the filter selects records with bin1=3 in all sets
QueryPolicy policy = new QueryPolicy(client.queryPolicyDefault);
policy.filterExp = Exp.build(
Exp.eq(Exp.intBin("bin1"), Exp.val(3)));
RecordSet rs = client.query(policy, stmt);
System.out.format("Query with expression filter on null set results:\n");
while (rs.next()) {
Key key = rs.getKey();
Record record = rs.getRecord();
System.out.format("set=%s key=%s bins=%s\n", key.setName, key.userKey, record.bins);
}
rs.close();
// scan with a null set
public class ScanParallel implements ScanCallback {
public void scanCallback(Key key, Record record) {
System.out.format("set=%s key=%s bins=%s\n", key.setName, key.userKey, record.bins);
}
}
ScanPolicy policy = new ScanPolicy();
policy.filterExp = Exp.build(
Exp.eq(Exp.intBin("bin1"), Exp.val(3)));
System.out.format("\nScan with expression filter on null set results:\n");
client.scanAll(policy, Namespace, null, new ScanParallel()); // null set
Output:
Query with expression filter on null set results:
set=sql-select-large key=id-3 bins={bin1=3, bin2=1003}
set=sql-select-small key=id-3 bins={bin1=3, bin2=1003}
set=null key=id-3 bins={bin1=3, bin2=1003}
Scan with expression filter on null set results:
set=sql-select-large key=id-3 bins={bin1=3, bin2=1003}
set=sql-select-small key=id-3 bins={bin1=3, bin2=1003}
set=null key=id-3 bins={bin1=3, bin2=1003}
Computed Fields with Server Function
An arbitrary function registered on the server (UDF) is invoked on the specified record. In this tutorial, we deal with a single record oriented functions as opposed to "stream oriented" functions. The latter will be discussed in a subsequent notebook on Aggregates in this series.
SELECT func() FROM namespace.set WHERE id = key
Object execute(WritePolicy policy, Key key, String packageName, String functionName, Value... functionArgs)
The API returns a generic Object which can be anything like a single value or a dictionary. Note, UDFs may not be appropriate for performance sensitive applications; for record-oriented functions, simply retrieving the record and computing the function on the client site may be faster. A read-write function may be alternatively implemented atomically on the client side using the read-modify-write pattern.
Create User Defined Function (UDF)
Examine the following Lua code that takes two bins and returns their sum and product. Create a "udf" directory under "java" and create a file "computed_fields.lua" with this Lua code.
Add the following code to the file "computed_fields.lua" in the sub-directory "udf":
<pre>
-- computed_fields.lua - return sum and product of specified bins
function sum_and_product(rec, binName1, binName2)
local ret = map() -- Initialize the return value (a map)
ret[binName1] = rec[binName1]
ret[binName2] = rec[binName2]
ret['sum'] = rec[binName1] + rec[binName2]
ret['product'] = rec[binName1] * rec[binName2]
return ret
end
</pre>
Register UDF
Register the UDF with the server.
import com.aerospike.client.task.RegisterTask;
import com.aerospike.client.Language;
String UDFDir = "./udf";
String UDFFile = "computed_fields.lua";
client.removeUdf(null, UDFFile);
RegisterTask task = client.register(policy, UDFDir+"/"+UDFFile,
UDFFile, Language.LUA);
task.waitTillComplete();
System.out.format("Registered the UDF module %s.", UDFFile);;
Output:
Registered the UDF module computed_fields.lua.
Execute UDF
Execute the UDF to retrieve the computed values.
// the UDF function returns sum and product of the specified bins
Key key = new Key(Namespace, NullSet, "id-1");
Object obj = client.execute(null, key, "computed_fields", "sum_and_product", Value.get("bin1"), Value.get("bin2"));
System.out.format("Computed fields using UDF results:\n");
System.out.format("key=%s object=%s\n", key.userKey, obj.toString());;
Output:
Computed fields using UDF results:
key=id-1 object={product=1001, sum=1002, bin1=1, bin2=1001}
Takeaways and Conclusion
Many developers that are familiar with SQL would like to see how SQL operations translate to Aerospike. We looked at how to implement various SELECT statements. This should be generally useful irrespective of the reader's SQL knowledge. While the examples here use synchronous execution, many operations can also be performed asynchronously.
Clean up
Remove tutorial data and close connection.
client.dropIndex(null, Namespace, SmallSet, IndexName);
client.truncate(null, Namespace, null, null);
client.close();
System.out.println("Removed tutorial data and closed server connection.");
Output:
Removed tutorial data and closed server connection.
Further Exploration and Resources
Here are some links for further exploration
Resources
- Related notebooks
- Queries
- Other notebooks in the SQL series on 1) UPDATE, CREATE, and DELETE, and 2) Aggregates.
- Aerospike Presto Connector
- Github repos
- Documentation
Next steps
Visit Aerospike notebooks repo to run additional Aerospike notebooks. To run a different notebook, download the notebook from the repo to your local machine, and then click on File->Open, and select Upload.