Queue ETL: Apache Kafka



Transformation Script

The basic characteristics of a Kafka ETL script are similar to those of the other ETL types.
The script defines what data to Extract from the database, how to Transform this data, and which Kafka topic to Load it to.

To load the data to a Kafka topic use the loadTo<Topic> command as follows:
loadTo<Topic>(obj, {attributes})

For example:

// Create an OrderData JSON object
var orderData = {
    Id: id(this),
    OrderLinesCount: this.Lines.length,
    TotalCost: 0
};

// Update orderData's TotalCost field
for (var i = 0; i < this.Lines.length; i++) {
    var line = this.Lines[i];
    var cost = (line.Quantity * line.PricePerUnit) * ( 1 - line.Discount);
    orderData.TotalCost += cost;
}

// Topic name: Orders
// Loaded object name: orderData
// Attributes: Id, PartitionKey, Type, Source
loadToOrders(orderData, {  
    Id: id(this),
    PartitionKey: id(this),
    Type: 'special-promotion',
    Source: '/promotion-campaigns/summer-sale'
})

Alternative Syntax

The target topic name can be passed to the loadTo command separately, as a string argument, using this syntax: loadTo('topic_name', obj, {attributes})

  • Example:
    The following two calls to loadTo are equivalent.
    loadToOrders(obj, {attributes})
    loadTo('Orders', obj, {attributes})

  • The target name 'Orders' in this syntax is not a variable and cannot be used as one: it is simply a string literal of the target's name.
  • Separating the target name from the loadTo command makes it possible to include symbols like - and . in target names. This is not possible when the standard loadToOrders syntax is used because including special characters in the name of a JS function turns it invalid.

Data Delivery


What is Transferred

  • Only Documents
    A Kafka ETL task transfers documents only.
    Document extensions like attachments, counters, or time series, will not be transferred.

  • As JSON Messages
    JSON objects produced by the task's transformation script are wrapped and delivered as CloudEvents Messages.


How Are Messages Produced and Consumed

The ETL task will send the JSON messages it produces to Kafka broker/s by your connection string.

Each message will then be pushed to the tail of each topic assigned to it in the transformation script, advance in the queue as preceding messages are pulled, and finally reach the queue's head and become available for consumers.

RavenDB publishes messages to Kafka using transactions.

Read more about Kafka clusters, brokers, topics, partitions, and other related subjects, in the platform's official documentation or a variety of other sources.


Idempotence and Message Duplication

RavenDB is an idempotent producer, that will not typically send duplicate messages to topics.

  • It is possible, however, that duplicate messages will be sent to the broker.
    For example:
    Different nodes of a RavenDB cluster will be regarded as different producers by the broker.
    If the node responsible for the ETL task fails while sending a batch of messages, the new responsible node may resend messages that were already received by the broker.

  • It is, therefore, the consumer's own responsibility (if processing each message only once is important to it) to verify the uniqueness of each consumed message.

Client API

This section explains how to create a Kafka ETL task using code.
Learn here how to define a Kafka ETL task using Studio.


Add a Kafka Connection String

Prior to defining an ETL task, add a connection string that the task will use to connect the message broker's bootstrap servers.

To create the connection string:

  • Prepare a QueueConnectionStringobject with the connection string configuration.
  • Pass this object to the PutConnectionStringOperation store operation to add the connection string.

Code Sample:

var res = store.Maintenance.Send(
    new PutConnectionStringOperation<QueueConnectionString>(
        new QueueConnectionString
        {
            Name = "KafkaConStr",
            BrokerType = QueueBrokerType.Kafka,
            KafkaConnectionSettings = new KafkaConnectionSettings() 
                { BootstrapServers = "localhost:9092" }
        }));
  • QueueConnectionString:

    public class QueueConnectionString : ConnectionString
    {
        public QueueBrokerType BrokerType { get; set; }
        // Configure if a Kafka connection string is needed
        public KafkaConnectionSettings KafkaConnectionSettings { get; set; }
        // Configure if a RabbitMq connection string is needed
        public RabbitMqConnectionSettings RabbitMqConnectionSettings { get; set; }
    }
    QueueBrokerType:
    public enum QueueBrokerType
    {
        None,
        Kafka,
        RabbitMq
    }
    Property Type Description
    Name string Connection string name
    BrokerType QueueBrokerType Set to QueueBrokerType.Kafka for a Kafka connection string
    KafkaConnectionSettings KafkaConnectionSettings[] A list of comma-separated host:port URLs to Kafka brokers

Add a Kafka ETL Task

To create the ETL task:

  • Prepare a QueueEtlConfigurationobject with the ETL task configuration.
  • Pass this object to the AddEtlOperation store operation to add the ETL task.

Code Sample:

// use PutConnectionStringOperation to add connection string
var res = store.Maintenance.Send(
    new PutConnectionStringOperation<QueueConnectionString>(
        new QueueConnectionString
        {
            Name = "KafkaConStr",
            BrokerType = QueueBrokerType.Kafka,
            KafkaConnectionSettings = new KafkaConnectionSettings() { BootstrapServers = "localhost:9092" }
        }));

// create transformation script
Transformation transformation = new Transformation
{
    Name = "scriptName",
    Collections = { "Orders" },
    Script = @"var orderData = {
                Id: id(this),
                OrderLinesCount: this.Lines.length,
                TotalCost: 0
            };

            for (var i = 0; i < this.Lines.length; i++) {
                var line = this.Lines[i];
                var cost = (line.Quantity * line.PricePerUnit) * ( 1 - line.Discount);
                orderData.TotalCost += cost;
            }

            loadToOrders(orderData, {
                Id: id(this),
                PartitionKey: id(this),
                Type: 'special-promotion',
                Source: '/promotion-campaigns/summer-sale'
            });",
    ApplyToAllDocuments = false
};

// use AddEtlOperation to add ETL task 
AddEtlOperation<QueueConnectionString> operation = new AddEtlOperation<QueueConnectionString>(
new QueueEtlConfiguration()
{
    Name = "KafkaEtlTaskName",
    ConnectionStringName = "KafkaConStr",
    Transforms =
        {
            transformation
        },
    Queues = { new EtlQueue() { Name = "Orders" } },
    BrokerType = QueueBrokerType.Kafka,

    // Do not prevent a failover to another node
    PinToMentorNode = false
});
store.Maintenance.Send(operation);
  • QueueEtlConfiguration properties:

    Property Type Description
    Name string The ETL task name
    ConnectionStringName string The registered connection string name
    Transforms List<Transformation>[] Your transformation script
    Queues List<EtlQueue> Optional actions to take when a document is processed, see Delete Processed Documents below.
    BrokerType QueueBrokerType Set to QueueBrokerType.Kafka to define a Kafka ETL task

Delete Processed Documents

You can include an optional EtlQueue property in the ETL configuration to trigger additional actions.
An action that you can trigger this way, is the deletion of RavenDB documents once they've been processed by the ETL task.

EtlQueue

public class EtlQueue
{
    public string Name { get; set; }
    public bool DeleteProcessedDocuments { get; set; }
}
Property Type Description
Name string Queue name
DeleteProcessedDocuments bool if true, delete processed documents from RavenDB

Code Sample:

new QueueEtlConfiguration()
{
    Name = "KafkaEtlTaskName",
    ConnectionStringName = "KafkaConStr",
    Transforms =
        {
            transformation
        },
    // Only define if you want to delete documents from RavenDB after they are processed.
    Queues =  
        new List<EtlQueue>() 
        {
            new()
            {
                // Documents that were processed by the transformation script will be
                // deleted from RavenDB after the message is loaded to the Orders queue.
                Name = "Orders",
                DeleteProcessedDocuments = true
            }
        },
    BrokerType = QueueBrokerType.Kafka
});