Bulk loading for distributed mode
Overviewโ
The guidelines on this page are a framework for efficiently bulk-loading large graph datasets into an Aerospike database using the distributed mode of the Aerospike Graph Service (AGS) bulk loader.
Loading large volumes of graph data is a computationally demanding task. It can be best optimized by harnessing the extensive computational capabilities of cloud services such as Amazon Web Services (AWS) or Google Cloud Platform (GCP) in tandem with the parallel-processing tool Apache Spark.
Architectural overviewโ
The distributed mode of the AGS bulk loader is a Java executable, distributed as a JAR file. The bulk loader runs as part of a Spark job, which distributes the workload across a cluster of servers.
The bulk loader reads the source data files and loads them using configuration information stored in the AGS properties file. After the Spark job finishes, you can query the edges and vertices of your dataset with AGS.
Elements of the bulk loaderโ
All elements of the bulk loader must reside in the same cloud network and be able to access one another. Those elements include:
- the JAR file
- the Aerospike Database cluster
- the source data files
- the properties file, and
- the Spark cluster
Prerequisitesโ
An Aerospike Database instance running on your cloud service.
See Deploying on GCP for help with GCP.
See Deploying on AWS for help with AWS.
A cloud bucket with write access for data processing.
Source data files stored in cloud buckets for edges and vertices in the Gremlin CSV format.
The bulk loader JAR file, stored in a cloud bucket.
An AGS properties file, stored in a cloud bucket.
Source data filesโ
The bulk loader accepts data files in the Gremlin CSV format, with vertices and edges specified in separate files. All CSV files must have header information with names for each column of data.
AGS does not support user-provided ~id
values for edges, so the ~id
column
is optional for edge CSV files. If your CSV file contains an ~id
column, the values
are ignored.
Data files must be stored in directories specified by the
aerospike.graphloader.vertices
and aerospike.graphloader.edges
configuration options.
The directory specified in
aerospike.graphloader.vertices
must contain one or more subdirectories of vertex CSV files.The directory specified in
aerospike.graphloader.edges
must contain one or more subdirectories of edge CSV files.Each subdirectory should be named for the label of the data files it contains. For example, a subdirectory of vertex files containing data about people should be named
people
. A subdirectory of edge files containing data about connections in thepeople
vertices, in which each row has theknows
label, should be namedknows
.
For example, if your cloud bucket is named myBucket
, that bucket must contain separate directories for edge and vertex data files, and those directories must contain subdirectories for the CSV files.
If aerospike.graphloader.vertices
is set to gs://MY_BUCKET/vertices
, you might have subdirectories named gs://MY_BUCKET/vertices/people
and
gs://MY_BUCKET/vertices/places
, each containing one or more CSV files.
Example cloud directory structure:
/MY_BUCKET
|
---- /MY_BUCKET/vertices/
|
-------- /MY_BUCKET/vertices/people/
|
------------ /MY_BUCKET/vertices/people/vert_file1.csv
------------ /MY_BUCKET/vertices/people/vert_file2.csv
|
-------- /MY_BUCKET/vertices/places/
|
------------ /MY_BUCKET/vertices/places/vert_file3.csv
------------ /MY_BUCKET/vertices/places/vert_file4.csv
|
---- /MY_BUCKET/edges/
|
-------- /MY_BUCKET/edges/worksWith/
|
------------ /MY_BUCKET/edges/worksWith/edge_file1.csv
------------ /MY_BUCKET/edges/worksWith/edge_file2.csv
|
-------- /MY_BUCKET/edges/knows/
|
------------ /MY_BUCKET/edges/knows/edge_file3.csv
------------ /MY_BUCKET/edges/knows/edge_file4.csv
|
---- /MY_BUCKET/configs/bulk-loader.properties
---- /MY_BUCKET/logs
The properties fileโ
The AGS properties file contains configuration information for AGS, such as the
network address of the Aerospike Database. To use the bulk loader, make a copy
of your config.properties
file, append the relevant bulk loader configuration properties, and save it in the
same cloud bucket with your source data files and bulk loader JAR file.
The following table lists some required and optional bulk loader configuration options.
Configuration key | Required? | Default | Description |
---|---|---|---|
aerospike.graphloader.edges | yes | none | URI path to edge CSVs. On GCP: gs://PATH_TO_BUCKET/edges/ On AWS: s3://PATH_TO_BUCKET/edges/ |
aerospike.graphloader.vertices | yes | none | URI path to vertex CSVs. On GCP: gs://PATH_TO_BUCKET/vertices/ On AWS: s3://PATH_TO_BUCKET/vertices/ |
aerospike.graphloader.sampling-percentage | no | 0 | Percentage of loaded elements to read back after writing to verify successful bulk loading when the -validate_output_data flag is enabled. |
aerospike.graphloader.temp-directory | yes | none | URI path to an empty directory with read/write access that can be used temporarily to store transformed intermediate data for bulk loading. Users must verify that the specified location is empty. This directory is not removed by the application after job completion. The temp directory must be on the same platform as the edges and vertices files. On GCP: gs://PATH_TO_BUCKET/temp/ On AWS: s3://PATH_TO_BUCKET/temp/ |
The aerospike.graphloader.edges
and aerospike.graphloader.vertices
options specify
the location of your edge and vertex source data files. Verify that the bulk loader process
has access to the cloud bucket containing your source files.
For a full description of the properties file and a list of configuration options, see configuration options.
Specifying the location of the properties fileโ
Use the -aerospike.graphloader.config
option in your Spark command to specify the
location of your AGS properties file.
The -aerospike.graphloader.config
option may be abbreviated as -c
.
Cloud-specific optionsโ
The bulk loader supports cloud-specific authentication options for cloud services that require credentials to run a Spark job.
The cloud-specific configuration options must be included as part of the bulk loader execution command. They cannot be part of the AGS properties file.
Name | Description |
---|---|
aerospike.graphloader.remote-user | On AWS: your AWS_ACCESS_KEY_ID value. On GCP: your key file private_key_id value. |
aerospike.graphloader.remote-passkey | On AWS: your AWS_SECRET_ACCESS_KEY value. On GCP: your key file private_key value. |
aerospike.graphloader.gcs-email | On GCP: your key file client_email value. |
When using Google Cloud Storage (GCS) for source data files, you must configure a GCS Service Account. Your credential information can be found in the JSON-generated key file for the GCS Service Account.
Cloud CLI proceduresโ
Select a cloud provider from the following tabs.
- AWS
- GCP
Loading with Amazon Web Services (AWS) CLIโ
Install the AWS CLI tool if you don't already have it.
Your default CLI profile must have sufficient permissions to create Amazon EC2 resources.
Launch an Elastic Map Reduce cluster
The following shell script sets all the environment variables and file paths needed to launch an Elastic Map Reduce (EMR) cluster and run an AGS bulk data loading job.
When you edit the script verify that all configuration elements are correct for your environment.
The S3 bucket must already exist.
The bulk loader JAR file and properties file must be present in the specified locations.
Verify that the source data files are correctly specified in the properties file and accessible by your default CLI profile.
Save the edited script in a file such as
launch-cluster.sh
and run it at the command line.#!/bin/bash
CLUSTER_NAME="Aerospike AWS Graph Cluster"
EMR_RELEASE="emr-6.15.0"
# Application logs are saved here. Edit this and all other S3 bucket names
# to match your S3 setup.
LOG_URI="s3://my_bucket/logs/"
SPARK_JOB_NAME="Aerospike Graph AWS Spark Job"
SPARK_CLASS="com.aerospike.firefly.bulkloader.SparkBulkLoaderMain"
# Update SPARK_JAR with your S3 bucket and the current bulk loader .jar file
SPARK_JAR="s3://my_bucket/jars/aerospike-graph-bulk-loader-x.y.z.jar"
# Add more Spark arguments as needed. See the main bulk loading
# documentation page for a list of available Spark flags.
SPARK_ARGS="-c,s3://my_bucket/configs/bulk-loader.properties"
AWS_REGION="us-west-1"
# Use the same subnet ID and AWS region for the EMR cluster and
# the Aerospike Database cluster for optimal performance.
#
# Note: if you create your Aerospike Database cluster with Aerolab,
# the AWS network and security group information is shown at the time of creation.
SUBNET_ID="subnet-##############" # Edit with your subnet ID
SECURITY_GROUP="sg-##############" # Edit with your security group ID
# Use Java 11.
CONFIGURATIONS='[{"Classification":"hadoop-env","Configurations":[{"Classification":"export","Configurations":[],"Properties":{"JAVA_HOME":"/usr/lib/jvm/java-11-amazon-corretto.x86_64"}}],"Properties":{}},{"Classification":"spark-env","Configurations":[{"Classification":"export","Configurations":[],"Properties":{"JAVA_HOME":"/usr/lib/jvm/java-11-amazon-corretto.x86_64"}}],"Properties":{}},{"Classification":"spark-defaults","Properties":{"spark.executorEnv.JAVA_HOME":"/usr/lib/jvm/java-11-amazon-corretto.x86_64"}}]'
# Create the EMR cluster.
echo "Creating EMR cluster..."
CLUSTER_ID=$(aws emr create-cluster \
--name "$CLUSTER_NAME" \
--release-label "$EMR_RELEASE" \
--applications Name=Spark \
--log-uri "$LOG_URI" \
--use-default-roles \
--instance-type m5.xlarge \
--instance-count 4 \
--ec2-attributes SubnetId="$SUBNET_ID",EmrManagedSlaveSecurityGroup="$SECURITY_GROUP",EmrManagedMasterSecurityGroup="$SECURITY_GROUP" \
--configurations "$CONFIGURATIONS" \
--query 'ClusterId' \
--region "$AWS_REGION" \
--output text)
# The cluster ID may be needed later for operational purposes, such as
# deactivating the cluster.
echo "Cluster ID: $CLUSTER_ID"
# Start the Spark job.
echo "Adding Spark job step..."
STEP_ID=$(aws emr add-steps --cluster-id "$CLUSTER_ID" \
--steps Type=Spark,Name="$SPARK_JOB_NAME",ActionOnFailure=CONTINUE,Args=[--class,"$SPARK_CLASS","$SPARK_JAR",$SPARK_ARGS] \
--query 'StepIds[0]' \
--output text \
--region "$AWS_REGION")
echo "Step ID: $STEP_ID"Loggingโ
Log files for the bulk loading job are saved to the location specified in the
LOG_URI
variable.
Loading with the gcloud
command-line interface (CLI)โ
Install the
gcloud
CLI if you don't already have it.Use the
gcloud init
command to authorize commands on your GCP account.The following shell script sets all the environment variables and file paths needed to launch a Spark Job on GCP run an AGS bulk data loading job.
Edit the script carefully to ensure that all the configuration elements are correct for your environment. The GCP bucket must already exist, and the bulk loader JAR file and properties file must be present in the specified locations. Ensure that the source data files are correctly specified in the properties file and accessible by your default CLI profile.
Save the edited script in a file such as
launch-cluster.sh
and run it at the command line when ready.#!/bin/bash
# Edit all these variable to match your GCP environment.
# Ensure that the bulk loader .jar file is correctly named and
# accessible by your CLI profile.
dataproc_name="testcluster"
region=us-central1
zone=us-central1-a
instance_type=n2d-highmem-8
num_workers=8
project=my-project
bulk_jar_uri="gs://my_bucket/aerospike-graph-bulk-loader-x.y.z.jar"
properties_file_uri="gs://my_bucket/bulk-loader.properties"
# Execute the dataproc command
gcloud dataproc clusters create "$dataproc_name" \
--enable-component-gateway \
--region $region \
--zone $zone \
--master-machine-type "$instance_type" \
--master-boot-disk-type pd-ssd \
--master-boot-disk-size 500 \
--num-workers "$num_workers" \
--worker-machine-type "$instance_type" \
--worker-boot-disk-type pd-ssd \
--worker-boot-disk-size 500 \
--image-version 2.1-debian11 \
--properties spark:spark.history.fs.gs.outputstream.type=FLUSHABLE_COMPOSITE \
--project $project
gcloud dataproc jobs submit spark \
--class=com.aerospike.firefly.bulkloader.SparkBulkLoader \
--jars="$bulk_jar_uri" \
--cluster="$dataproc_name" \
--region="$region" \
-- -c "$properties_file_uri"
Bulk data loading job stages and stepsโ
A bulk data loading job consists of three stages:
- Prep
- Partially loaded
- Complete
Within these three stages are several steps:
Stage | Step | Description |
---|---|---|
Prep | Resuming job | First step when resuming a previously started job. |
Prep | Read vertex/edge files | Check directory structure. |
Prep | Preflight check | Verify that the CSV format is properly formatted and parsable. |
Prep | Temporary data writing | Intermediate transformative step to generate data for efficient writing of graph elements to the database. |
Partiallyย written | Supernode extraction | Detect supernodes in the dataset to properly write them. |
Partially written | Vertex writing | Write vertices to the database. |
Partially written | Vertex validation | Validate accuracy of written vertices using graph traversal queries. |
Partially written | Edge writing | Write edges to the database. |
Partially written | Edge validation | Validate accuracy of written edges using graph traversal queries. |
Complete | N/A | Job has successfully been completed, vertices and edges are loaded, resume job information is removed. |
If a loading job fails at the Partially written stage due to an error, you can restart the job at the point at which it failed.
Incremental data loadingโ
Incremental data loading allows you to:
- Add vertices to an existing graph.
- Add edges to new and existing vertices.
- Update properties of existing vertices.
To load data incrementally, add the -incremental_load
flag to the submit spark
command
for your cloud service.
Spark job flagsโ
The following flags are all optional.
Argument | Description |
---|---|
-incremental_load | Add new data to an existing graph. |
-validate_input_data | Perform format and data validation of all Vertex and Edge CSV files before writing to Aerospike database. |
-verify_output_data | Perform verification of a percentage of loaded elements, specified by aerospike.graphloader.sampling-percentage , by reading them back after loading. The verification process uses a traversal query. |
-resume | Resume a previously failed job. |
-clear_existing_data | Delete all existing data before beginning the new job. |