Queue ETL: Apache Kafka
-
Apache Kafka is a distributed, high-performance, transactional messaging platform, that remains performant as the number of messages it needs to process increases and the number of events it needs to stream climbs to the big-data zone.
-
Kafka's ETL support allows RavenDB to take the role of an events producer in a Kafka architecture.
-
You can create a RavenDB Kafka ETL task to Extract data from the database, Transform it by your custom script, and Load the resulting JSON object to a Kafka destination as a CloudEvents message.
-
In this page:
Transformation Script
The basic characteristics of a Kafka ETL script
are similar to those of the other ETL types.
The script defines what data to Extract from the database, how to Transform this data, and
which Kafka topic to Load it to.
To load the data to a Kafka topic use the loadTo<Topic>
command as follows:
loadTo<Topic>(obj, {attributes})
- Topic:
The Kafka topic name - obj:
The object to transfer - attributes:
Optional attributes
For example:
// Create an OrderData JSON object
var orderData = {
Id: id(this),
OrderLinesCount: this.Lines.length,
TotalCost: 0
};
// Update orderData's TotalCost field
for (var i = 0; i < this.Lines.length; i++) {
var line = this.Lines[i];
var cost = (line.Quantity * line.PricePerUnit) * ( 1 - line.Discount);
orderData.TotalCost += cost;
}
// Topic name: Orders
// Loaded object name: orderData
// Attributes: Id, PartitionKey, Type, Source
loadToOrders(orderData, {
Id: id(this),
PartitionKey: id(this),
Type: 'special-promotion',
Source: '/promotion-campaigns/summer-sale'
})
Alternative Syntax
The target topic name can be passed to the loadTo
command separately, as a string argument,
using this syntax: loadTo('topic_name', obj, {attributes})
- Example:
The following two calls toloadTo
are equivalent.
loadToOrders(obj, {attributes})
loadTo('Orders', obj, {attributes})
- The target name
'Orders'
in this syntax is not a variable and cannot be used as one: it is simply a string literal of the target's name. - Separating the target name from the
loadTo
command makes it possible to include symbols like-
and.
in target names. This is not possible when the standardloadToOrders
syntax is used because including special characters in the name of a JS function turns it invalid.
Data Delivery
What is Transferred
-
Only Documents
A Kafka ETL task transfers documents only.
Document extensions like attachments, counters, or time series, will not be transferred. -
As JSON Messages
JSON objects produced by the task's transformation script are wrapped and delivered as CloudEvents Messages.
How Are Messages Produced and Consumed
The ETL task will send the JSON messages it produces to Kafka broker/s by your connection string.
Each message will then be pushed to the tail of each topic assigned to it in the transformation script, advance in the queue as preceding messages are pulled, and finally reach the queue's head and become available for consumers.
RavenDB publishes messages to Kafka using transactions.
Read more about Kafka clusters, brokers, topics, partitions, and other related subjects,
in the platform's official documentation
or a variety of other sources.
Idempotence and Message Duplication
RavenDB is an idempotent producer, that will not typically send duplicates messages to topics.
-
It is possible, however, that duplicate messages will be sent to the exchange.
For example:
Different nodes of a RavenDB cluster will be regarded as different producers by the exchange.
If the node responsible for the ETL task fails while sending a batch of messages, the new responsible node may resend messages that were already received by the exchange. -
It is, therefore, the consumer's own responsibility (if processing each message only once is important to it) to verify the uniqueness of each consumed message.
Client API
This section explains how to create a Kafka ETL task using code.
Learn here
how to define a Kafka ETL task using Studio.
Add a Kafka Connection String
Prior to defining an ETL task, add a connection string that the task will use to connect the message broker's bootstrap servers.
To create the connection string:
- Prepare a
QueueConnectionString
object with the connection string configuration. - Pass this object to the
PutConnectionStringOperation
store operation to add the connection string.
Code Sample:
var res = store.Maintenance.Send(
new PutConnectionStringOperation<QueueConnectionString>(
new QueueConnectionString
{
Name = "KafkaConStr",
BrokerType = QueueBrokerType.Kafka,
KafkaConnectionSettings = new KafkaConnectionSettings()
{ BootstrapServers = "localhost:9092" }
}));
-
QueueConnectionString
:
public class QueueConnectionString : ConnectionString { public QueueBrokerType BrokerType { get; set; } // Configure if a Kafka connection string is needed public KafkaConnectionSettings KafkaConnectionSettings { get; set; } // Configure if a RabbitMq connection string is needed public RabbitMqConnectionSettings RabbitMqConnectionSettings { get; set; } }
QueueBrokerType
:
public enum QueueBrokerType { None, Kafka, RabbitMq }
Property Type Description Name string
Connection string name BrokerType QueueBrokerType
Set to QueueBrokerType.Kafka
for a Kafka connection stringKafkaConnectionSettings KafkaConnectionSettings[]
A list of comma-separated host:port URLs to Kafka brokers
Add a Kafka ETL Task
To create the ETL task:
- Prepare a
QueueEtlConfiguration
object with the ETL task configuration. - Pass this object to the
AddEtlOperation
store operation to add the ETL task.
Code Sample:
// use PutConnectionStringOperation to add connection string
var res = store.Maintenance.Send(
new PutConnectionStringOperation<QueueConnectionString>(
new QueueConnectionString
{
Name = "KafkaConStr",
BrokerType = QueueBrokerType.Kafka,
KafkaConnectionSettings = new KafkaConnectionSettings() { BootstrapServers = "localhost:9092" }
}));
// create transformation script
Transformation transformation = new Transformation
{
Name = "scriptName",
Collections = { "Orders" },
Script = @"var orderData = {
Id: id(this),
OrderLinesCount: this.Lines.length,
TotalCost: 0
};
for (var i = 0; i < this.Lines.length; i++) {
var line = this.Lines[i];
var cost = (line.Quantity * line.PricePerUnit) * ( 1 - line.Discount);
orderData.TotalCost += cost;
}
loadToOrders(orderData, {
Id: id(this),
PartitionKey: id(this),
Type: 'special-promotion',
Source: '/promotion-campaigns/summer-sale'
});",
ApplyToAllDocuments = false
};
// use AddEtlOperation to add ETL task
AddEtlOperation<QueueConnectionString> operation = new AddEtlOperation<QueueConnectionString>(
new QueueEtlConfiguration()
{
Name = "KafkaEtlTaskName",
ConnectionStringName = "KafkaConStr",
Transforms =
{
transformation
},
Queues = { new EtlQueue() { Name = "Orders" } },
BrokerType = QueueBrokerType.Kafka,
// Do not prevent a failover to another node
PinToMentorNode = false
});
store.Maintenance.Send(operation);
-
QueueEtlConfiguration
properties:Property Type Description Name string
The ETL task name ConnectionStringName string
The registered connection string name Transforms List<Transformation>[]
Your transformation script Queues List<EtlQueue>
Optional actions to take when a document is processed, see Delete Processed Documents below. BrokerType QueueBrokerType
Set to QueueBrokerType.Kafka
to define a Kafka ETL task
Delete Processed Documents
You can include an optional EtlQueue
property in the ETL configuration to
trigger additional actions.
An action that you can trigger this way, is the deletion of RavenDB documents
once they've been processed by the ETL task.
EtlQueue
public class EtlQueue
{
public string Name { get; set; }
public bool DeleteProcessedDocuments { get; set; }
}
Property | Type | Description |
---|---|---|
Name | string |
Queue name |
DeleteProcessedDocuments | bool |
if true , delete processed documents from RavenDB |
Code Sample:
new QueueEtlConfiguration()
{
Name = "KafkaEtlTaskName",
ConnectionStringName = "KafkaConStr",
Transforms =
{
transformation
},
// Only define if you want to delete documents from RavenDB after they are processed.
Queues =
new List<EtlQueue>()
{
new()
{
// Documents that were processed by the transformation script will be
// deleted from RavenDB after the message is loaded to the Orders queue.
Name = "Orders",
DeleteProcessedDocuments = true
}
},
BrokerType = QueueBrokerType.Kafka
});