In this blog post, we detail how to create an Amazon Managed Streaming for Apache Kafka (Amazon MSK) resource using AWS Identity and Access Management (AWS IAM) in roles and policies to authenticate user access. In the initial step, we establish an Aerospike Database cluster and insert sample messages into the database. Subsequently, we observe in real time how these messages are streamed to Amazon MSK using Aerospike's Kafka Source Connector. Below we provide a comprehensive, step-by-step guide for users to successfully implement this process.
AWS MKS Kafka
In this section, you will set up a simple three-node Kafka cluster.
Visit the AWS console and select MSK service.
2. Create a new cluster by selecting Create Cluster → Quick Create.
3. Select the provisioned cluster and instance type of kafka.t3.small.
4. Select the EBS storage type per broker of 10 GB.
NOTE: Take note of the VPC, subnets, and security group ID, as you will require these details later in the article.
The next step is the critical step where you will create the AWS IAM policy and roles. This setup ensures that the Aerospike Database authenticates using AWS IAM to write data to MSK.
5. From the AWS Console, select the AWS IAM service.
6. To create a new AWS IAM policy, copy the following JSON and paste it in the JSON tab. Replace region:Account-ID
with your own region and AWS account ID.
7. Save the policy and name it msk-tutorial-policy.
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"kafka-cluster:Connect",
"kafka-cluster:AlterCluster",
"kafka-cluster:DescribeCluster"
],
"Resource": [
"arn:aws:kafka:region:Account-ID:cluster/MSKTutorialCluster/*"
]
},
{
"Effect": "Allow",
"Action": [
"kafka-cluster:*Topic*",
"kafka-cluster:WriteData",
"kafka-cluster:ReadData"
],
"Resource": [
"arn:aws:kafka:region:Account-ID:topic/MSKTutorialCluster/*"
]
},
{
"Effect": "Allow",
"Action": [
"kafka-cluster:AlterGroup",
"kafka-cluster:DescribeGroup"
],
"Resource": [
"arn:aws:kafka:region:Account-ID:group/MSKTutorialCluster/*"
]
}
]
}
8.
Create the IAM role.
9. Under Common Use Cases, select EC2 and then Next.
10. Under Permissions, select the policy named msk-tutorial-policy and then Next.
11. Give the role a name like msk-tutorial-role and click the Create Role button.
Kafka client machine
Next, create a client machine to install the Kafka tools necessary to access our MSK cluster.
Create a new ec2 instance using type
t2.micro
2. Use the default AMI: Amazon Linux 2023
The AMI may be different depending on your region
3. Create a key-pair if required. I am using an already existing key-pair.
4. Under Advanced Options.IAM instance profile, select the IAM role created earlier.
5. Launch the instance!
6. Under instances launched, choose the instance you just created. Click on the ‘Security’ tab and note the security group associated with this instance. e.g., sg-0914e6271c97ae4c9 (launch-wizard-1)
7. Navigate to the VPC section and select Security Groups from the left-hand menu. Locate the security group associated with the MSK cluster, such as sg-e5f51dfb
, and choose Edit Inbound Rules.
8. Create a new rule to allow all traffic from the new ec2 instance.
Kafka topics
After successfully establishing your initial Kafka cluster and Kafka client machine, proceed to conduct testing. Verify the functionality by accessing the MSK cluster, creating a topic, producing and consuming sample messages, and ensuring that everything operates as anticipated.
From the MSK Cluster, note the Kafka version being used. This examples uses 2.8.1.
From the Kafka client machine, install Java 11+.
sudo yum -y install java-11
3.
Download Apache Kafka using wget, then extract the archive using tar.
wget https://archive.apache.org/dist/kafka/2.8.1/kafka_2.12-2.8.1.tgz
tar -xzf kafka_2.12-2.8.1.tgz
4. To use IAM, you will need the MSK IAM Auth jar file. Download the jar to the Kafka libs folder you just extracted.
cd kafka_2.12-2.8.1/libs/
wget https://github.com/aws/aws-msk-iam-auth/releases/download/v1.1.1/aws-msk-iam-auth-1.1.1-all.jar
cd ../bin/
5. Create a file called client.properties to use when authenticating to MSK. It will define the SASL mechanism to use and reference the Java class file that will handle your IAM callbacks.
cat <<EOF> client.properties
security.protocol=SASL_SSL
sasl.mechanism=AWS_MSK_IAM
sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
EOF
Creating topics
Go to the AWS Console and view the MSK Cluster Client Information. There will be three endpoints to choose from, but you only require one.
Example choose:
B-2.msktutorialcluster.450050.c11.kafka.us-east-1.amazonaws.com:9098
From the kafka/bin
folder, run the command to create a topic. Let's call it aerospike-airforce-1
.
export BootstrapServerString="b-2.msktutorialcluster.450050.c11.kafka.us-east-1.amazonaws.com:9098"
./kafka-topics.sh --create --bootstrap-server $BootstrapServerString --command-config client.properties --replication-factor 3 --partitions 1 --topic aerospike-airforce-1
Listing topics
To list the topics, use the following command. Notice our latest topic, called aerospike-airforce-1, just showed up.
./kafka-topics.sh --bootstrap-server $BootstrapServerString --command-config client.properties --list
MSKTutorialTopic
__amazon_msk_canary
__consumer_offsets
aerospike
aerospike-airforce-1
Producer and consumer
I agree that this is more of a Kafka-101 rather than a straightforward Hello-World scenario. Nonetheless, it is essential to test our configuration by sending and receiving messages from the designated Kafka topic before proceeding further.
Produce some messages by opening a new window and running the following Kafka producer command. Type three or four messages, hitting the 'Return' key after each message.
./kafka-console-producer.sh --broker-list $BootstrapServerString --producer.config client.properties --topic aerospike-airforce-1
>Instrument Check
>Pre flight checks confirmed
>Ready for takeoff
>Full throttle, flaps
You're now ready to start a client consumer application. Open a new window and run the consumer. You should now see the same messages you published earlier.
./kafka-console-consumer.sh --bootstrap-server $BootstrapServerString --consumer.config client.properties --topic aerospike-airforce-1 --from-beginning
Instrument Check
Pre flight checks confirmed
Ready for takeoff
Full throttle, flaps
Database source
Let's review your achievements thus far. You've established a 3-node Kafka cluster in AWS utilizing MSK, incorporating IAM roles and permissions. Additionally, you have successfully created topics and demonstrated the production and consumption of messages using the IAM credentials established during the setup.
The next phase of your journey involves installing the Aerospike Database, inserting messages, and configuring a simple XDR component. XDR is a Cross Datacenter Replication tool and is crucial for transmitting data from the Aerospike Database to the Aerospike Kafka Source Connector allowing us to subsequently forward messages to Amazon MSK.
Create the Aerospike Database
Start by creating a new ec2 instance. For this demo, you can use Linux Centos 8
Rocky 8 AMI: ami-043ceee68871e0bb5 ( us-east-1 )
2. Select the instance type as t2.medium
.
3. Add the extra volume for the Aerospike data storage layer. EBS volume is all that is required for now.
4. Launch the instance and connect to the host using ssh. If you have an Aerospike license feature file, upload it to the instance.
Install the Aerospike Database server
Run the following to install the Aerospike Database Server.
export VER="6.1.0.2"
sudo yum install java python3 openssl-devel wget git gcc maven bind-utils sysstat nc -y
wget -O aerospike-tools.tgz 'https://www.aerospike.com/download/tools/latest/artifact/el8'
tar -xvf aerospike-tools.tgz
cd aerospike-tools_*
sudo ./dep-check
sudo ./asinstall
wget -O aerospike.tgz https://enterprise.aerospike.com/enterprise/download/server/$VER/artifact/el8
tar -xvf aerospike.tgz
cd aerospike-server-enterprise-$VER-el8
sudo ./asinstall
sudo mkdir -p /var/log/aerospike/
sudo systemctl enable aerospike
2.
Confirm the storage disk for Aerospike.
lsblk
NAME MAJ:MIN RM SIZE RO TYPE MOUNTPOINT
xvda 202:0 0 10G 0 disk
└─xvda1 202:1 0 10G 0 part /
xvdb 202:16 0 10G 0 disk <<----------------- This one!
3. When its data is available, replace the Aerospike configuration file under /etc/aerospike/aerospike.conf with the configuration file listed below, also replacing the following lines:
Under
heartbeat.address
add in your internal 172.x.x.x address
For
xdr.dc.node-address-port
enter the {kafka-client-machine-address}:8080
Aerospike Database configuration file for use with systemd
service {
# paxos-single-replica-limit 1 # Number of nodes where the replica count is automatically
proto-fd-max 15000
service-threads 10
feature-key-file /etc/aerospike/features.conf
node-id A1
cluster-name CLA
}
logging {
file /var/log/aerospike/aerospike.log {
context any info
}
}
# public and private addresses
network {
service {
address any
port 3000
}
heartbeat {
mode mesh
address 172.31.94.201
port 3002 # Heartbeat port for this node.
interval 150 # controls how often to send a heartbeat packet
timeout 10 # number of intervals after which a node is considered to be missing
}
fabric {
port 3001
}
info {
port 3003
}
}
namespace test {
replication-factor 2
memory-size 40G
default-ttl 0
index-type shmem
high-water-disk-pct 50
high-water-memory-pct 60
stop-writes-pct 90
nsup-period 0
storage-engine device {
device /dev/xvdb
data-in-memory false
write-block-size 128K
min-avail-pct 5
}
}
xdr {
# Change notification XDR block that round-robins between two connector nodes
dc aerospike-kafka-source {
connector true
node-address-port 172.31.58.190 8080
namespace test {
}
}
}
Start the Aerospike service
Copy the license feature file to the aerospike configuration directory.
sudo cp features.conf /etc/aerospike/
2. Start the Aerospike server and check the logs to ensure there are no errors.
sudo systemctl start aerospike
sudo systemctl status aerospike
Aerospike Kafka Source Connector
The seamless flow of data from Aerospike Database Enterprise Edition to Apache Kafka hinges on the utilization of the Aerospike Kafka source (outbound) connector. This connector subscribes to change notifications. Upon receiving these notifications, the connector converts them into messages, which are dispatched to Kafka topics. Going back to the ec2 instance you created earlier with our Kafka client configured, go ahead and install the Aerospike Kafka Source Connector. This is your outbound connector to send data from the Aerospike to MSK.
sudo yum install java #( install 11+ JDK )
wget https://enterprise.aerospike.com/artifacts/enterprise/aerospike-kafka-outbound/5.0.1/aerospike-kafka-outbound-5.0.1-1.noarch.rpm
sudo rpm -i aerospike-kafka-outbound-5.0.0-1.noarch.rpm
Configure the outbound connector
The terms “outbound” and “source connector” are used interchangeably in this article.
Locate the following file on the Kafka client box:
/etc/aerospike-kafka-outbound/aerospike-kafka-outbound.yml
.Replace the broker address for one of the node addresses in the MSK Kafka cluster
producer-props.bootstrap.servers
.Then add the following contents to the file with the changes that have been outlined.
# Change the configuration for your use case.
#
# Refer to https://www.aerospike.com/docs/connectors/enterprise/kafka/outbound/configuration/index.html
# for details.
# The connector's listening ports, TLS, and network interface.
service:
port: 8080
# Format of the Kafka destination message.
format:
mode: flat-json
metadata-key: metadata
# Aerospike record routing to a Kafka destination.
routing:
mode: static
destination: aerospike
# Kafka producer initialization properties.
producer-props:
bootstrap.servers:
- b-3.msktutorialcluster.450050.c11.kafka.us-east-1.amazonaws.com:9098
ssl.truststore.location: /etc/aerospike-kafka-outbound/kafka.client.truststore.jks
ssl.truststore.password: changeit
security.protocol: SASL_SSL
sasl.mechanism: AWS_MSK_IAM
sasl.jaas.config: software.amazon.msk.auth.iam.IAMLoginModule required awsProfileName=default;
sasl.client.callback.handler.class: software.amazon.msk.auth.iam.IAMClientCallbackHandler
# The logging properties.
logging:
file: /var/log/aerospike-kafka-outbound/aerospike-kafka-outbound.log
enable-console-logging: true
levels:
root: debug
record-parser: debug
server: debug
com.aerospike.connect: debug
ticker-interval: 3600
4. Create the CA certificate trust store for use in the Kafka Outbound Connector config. You can see the SSL trust store location referenced in the file above as ssl.truststore.location
sudo cp /usr/lib/jvm/java-11-amazon-corretto/lib/security/cacerts /etc/aerospike-kafka-outbound/kafka.client.truststore.jks
sudo chmod 755 /etc/aerospike-kafka-outbound/kafka.client.truststore.jks
5. Finally, make the AWS IAM Kafka Auth Jar file available to the Aerospike Outbound Kafka Connector. This is the same jar file that you downloaded and added to the kafka/libs folder.
sudo cp kafka_2.12-2.8.1/libs/aws-msk-iam-auth-1.1.1-all.jar /opt/aerospike-kafka-outbound/lib/aws-msk-iam-auth-1.1.1-all.jar
6.
Start the service.
sudo systemctl enable aerospike-kafka-outbound
sudo systemctl start aerospike-kafka-outbound
Send data from Aerospike to Kafka
Open a separate window so you can list all messages on the Aerospike Kafka topic. Start by adding one of the private endpoint bootstrap servers as an environment variable for ease of use.
export BootstrapServerString="b-3.msktutorialcluster.450050.c11.kafka.us-east-1.amazonaws.com:9098"
2. Run the consumer client as follows:
./kafka-console-consumer.sh --bootstrap-server $BootstrapServerString --consumer.config client.properties --topic aerospike --from-beginning
3. In a new window, start AQL, the Aerospike command line client which connects to your Aerospike Database.
aql -U auser -P a-secret-pwd
4.
Insert some data
insert into test (pk, a) values(400, "Your winning lottery ticket awaits you")
5. Check to see if the message appears in the Kafka consumer window
{"metadata":{"namespace":"test","userKey":400,"digest":"W7eGav2hKfOU00xx7mnOPYa2uCo=","msg":"write","gen":1,"lut":1681488437767,"exp":0},"a":"Your winning lottery ticket awaits you"}
Conclusion
You've just discovered how straightforward it is to transmit data from Aerospike to AWS MSK Kafka while ensuring client authentication through AWS IAM permissions! From establishing an Aerospike Database from scratch to configuring the AWS MSK Kafka cluster and employing the Aerospike Outbound Kafka Connector, you've effortlessly constructed a real-time streaming data pipeline. Congratulations on this accomplishment!
Share your experience! Your feedback is important to us. Join our Aerospike community!