AWS DMS to Azure EventHub through Kafka
Problem Statement
You have some transactional RDS table data in AWS and you need to take it to Azure to perform data analysis using Power BI in Azure. It should be done in an optimized way in terms of ease and cost. Need to create data pipelines on the AWS as well as Azure side.
Solution
We can think of a couple of possible options here.
RDS-DMS-S3-AzureDataFactory
In this case, the source RDS database has to be configured first to prepare for CDC (Change Data Capture). In my case, I had an Aurora MySQL as source and it need to be configured for binary logs as follows. https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Source.MySQL.html#CHAP_Source.MySQL.AmazonManaged Section: Using an AWS-managed MySQL-compatible database as a source for AWS DMS
Now you need to create a DMS intance and here we have options of EC2 based ones and serverless. First you need to create a target end point that drops the files from source table on to an S3 bucket. Then you need to create a task where you can configure to execute a a full dump + CDC on the source table.
On the Azure side, the best option is to use an Azure Data Factory (OneLake data hub) where you can create Spark job to read files from S3.
This approach all works fine, but has 2 major drawbacks.
- If you have a transactional table, that has very frequent updates, then it created a huge number of S3 writes with these delta change (CDCs). Though the storage cost of S3 is low, there was a spike in the S3 cost due to increase in the writes.
- The Spark job also has to keep track of the read records to keep the CDC continuous and the polling on S3 also increased the S3 cost.
Hence had to figure out a better option.
RDS-DMS-EventHub-AzureDataFactory
DMS allows to have a Kafka endpoint as target and good that Azure Event Hub allows communication though Kafka. So it seemed to be a better solution as the Event Hub is charged per million messages and Spark Job can read from Event Hub in a better way than S3 as:
- The messages are ordered
- Unread messages are kept in the queue for 24 hours
- Once read, the messages disappear
But the problem was that I was unable to create a Kafka enpoint from AWS side. Both Azure support and AWS support didn’t have a documentation on how to do this. After working for a few weeks, we finally craked the solution together with the help of support.
First tried to create a Kafka endpoint with AWS DMS Console, but there was a confusion on what attributes are required to connect successfully to Event Hub. The DMS documentation — https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Target.Kafka.html— doesn’t have the clarity here on what Endpoint settings need to be used.
The following sample : https://github.com/Azure/azure-event-hubs-for-kafka, helped to connect to Azure EventHub abd post a message. From Azure support, it was found that the sasl.username & sasl.password has to he in this format that is mentioned in the sample. But the AWS endpoint settings input doesn’t allow special characters like $.
AWS support helped to find a work aound to create an endpoint using AWS CLI.
aws dms create-endpoint \
--endpoint-identifier <endpoint_name> \
--endpoint-type target \
--engine-name kafka \
--kafka-settings '{
"Broker": "<eventhub_name>.servicebus.windows.net:9093",
"Topic": "<eventhub_name>",
"SecurityProtocol": "Sasl-ssl",
"SaslMechanism": "PLAIN",
"SaslUsername": "$ConnectionString",
"SaslPassword": "Endpoint=sb://..... (Connection string from Shared Acces Key)"
}'
Also note that it worked successfully only after upgrading CLI to the latest version (aws-cli/2.15.7 as of now). After that the endpoint testing got successful and DMS was able to post the CDC message successfully.
After posting a message, you can’t read the messages directly on the sonole. but need to use a client code to subsctibe to it.
Sending Message from NodeJS Client
If you have to send message to a nodejs client, it is better NOT to use producer.on(‘ready’) event to send messages (as described here : https://github.com/Azure/azure-event-hubs-for-kafk) aas the cost of creating the producer instance is heavy. Instead it is better to go with an Http based approach as follows
const axios = require('axios');
const crypto = require('crypto');
const namespace = '<namespace>'; // replace with your Event Hubs namespace
const hubName = '<eventhubName>'; // replace with your Event Hub name
const sasKeyName = '<sasKeyName>'; // replace with your SAS key name
const sasKeyValue = '<sasKeyValue>'; // replace with your SAS key value
function createSasToken(uri, sasKeyName, sasKeyValue) {
const encoded = encodeURIComponent(uri);
const now = new Date();
const week = 60 * 60 * 24 * 7;
const ttl = Math.round(now.getTime() / 1000) + week;
const signature = encoded + '\n' + ttl;
const signatureUTF8 = crypto.createHmac('sha256', sasKeyValue).update(signature).digest('base64');
return 'SharedAccessSignature sr=' + encoded + '&sig=' + encodeURIComponent(signatureUTF8) + '&se=' + ttl + '&skn=' + sasKeyName;
}
const sasToken = createSasToken(`https://${namespace}.servicebus.windows.net/${hubName}`, sasKeyName, sasKeyValue);
//Send 10 test messages
const maxMessages = 10;
for (let i = 0; i < maxMessages; i++) {
const message = {
"name": `person${i}`
};
axios.post(`https://${namespace}.servicebus.windows.net/${hubName}/messages`, message, {
headers: {
'Authorization': sasToken,
'Content-Type': 'application/atom+xml;type=entry;charset=utf-8'
}
})
.then((response) => {
console.log(`Message sent successfully: ${JSON.stringify(response.data)}`);
})
.catch((error) => {
console.error(`Failed to send message: ${error}`);
});
}
Details of the changes to be made on the Azure side will be explained in another story.