Queue ETL: RabbitMQ



Transformation Script

The basic characteristics of a RabbitMQ ETL script are similar to those of the other ETL types.
The script defines what data to Extract from the database, how to Transform this data, and which queue/s to Load it to.

To load the data to RabbitMQ, use the loadTo<Exchange> command as follows:
loadTo<Exchange>(obj, routingKey, {attributes})

  • Exchange:
    The RabbitMQ exchange name
  • obj:
    The object to transfer
  • routingKey:
    A message attribute that the exchange checks when it decides how to route the message to queues (depending on the exchange type)
  • attributes:
    Optional attributes

For example:

// Create an OrderData JSON object
var orderData = {
    Id: id(this), 
    OrderLinesCount: this.Lines.length,
    TotalCost: 0
};

// Update orderData's TotalCost field
for (var i = 0; i < this.Lines.length; i++) {
    var line = this.Lines[i];
    var cost = (line.Quantity * line.PricePerUnit) * ( 1 - line.Discount);
    orderData.TotalCost += cost;
}

// Exchange name: Orders
// Loaded object name: orderData
// Routing key: users 
// Attributes: Id, PartitionKey, Type, Source
loadToOrders(orderData, `users`, {  
    Id: id(this),
    PartitionKey: id(this),
    Type: 'special-promotion',
    Source: '/promotion-campaigns/summer-sale'
});

Alternative Syntaxes

Alternative supported syntaxes include:

  • The exchange name can be provided separately, as a string:
    loadTo('exchange-name', obj, 'routing-key', { attributes })
    E.g. - loadTo('Orders', orderData, 'users')

    Using this syntax, you can replace the exchange name with an empty string, as in: loadTo('', orderData, 'users')
    When an empty string is sent this way the message is pushed directly to queues, using a default exchange (pre-defined by the broker).
    In the above example, loadTo('', orderData, 'users'), the message is pushed directly to the users queue.

  • The routing key can be omitted, as in: loadToOrders(orderData)
    In the lack of a routing key messages delivery will depend upon the type of exchange you use.

  • Additional attributes (like the Cloudevents-specific Id, Type, and Source attributes) can be omitted.

Data Delivery

What is Transferred

  • Only Documents
    A RabbitMQ ETL task transfers documents only.
    Document extensions like attachments, counters, or time series, will not be transferred.

  • As JSON Messages
    JSON objects produced by the task's transformation script are wrapped and delivered as CloudEvents Messages.


How Are Messages Produced and Consumed

The ETL task will use the address provided in your connection string, and send the JSON messages it produces to a RabbitMQ exchange.

Each message will then be pushed to the tail of the queue assigned to it in the transformation script, advance in the queue as preceding messages are pulled, and finally reach the queue's head and become available for consumers.

RavenDB publishes messages to RabbitMQ using transactions and batches, creating a batch of messages and opening a transaction to the exchange for the batch.

Read more about RabbitMQ in the platform's official documentation or a variety of other sources.


Message Duplication

It is possible that duplicate messages will be sent to the exchange.

If, for example, the RavenDB node responsible for the ETL task fails while sending messages, the new responsible node may resend some of the messages that were already enqueued.

It is therefore the consumer's own responsibility (if processing each message only once is important to it) to verify the uniqueness of each consumed message.

Client API

This section explains how to create a RabbitMQ ETL task using code.
Learn here how to define a RabbitMQ ETL task using Studio.


Add a RabbitMQ Connection String

Prior to defining an ETL task, add a connection string that the task will use to connect RabbitMQ.

To create the connection string:

  • Prepare a QueueConnectionStringobject with the connection string configuration.
  • Pass this object to the PutConnectionStringOperation store operation to add the connection string.

Code Sample:

var res = store.Maintenance.Send(
    new PutConnectionStringOperation<QueueConnectionString>(
        new QueueConnectionString
        {
            Name = "RabbitMqConStr",
            BrokerType = QueueBrokerType.RabbitMq,
            RabbitMqConnectionSettings = new RabbitMqConnectionSettings() 
                { ConnectionString = "amqp://guest:guest@localhost:49154" }
        }));
  • QueueConnectionString:

    public class QueueConnectionString : ConnectionString
    {
        public QueueBrokerType BrokerType { get; set; }
        // Configure if a Kafka connection string is needed
        public KafkaConnectionSettings KafkaConnectionSettings { get; set; }
        // Configure if a RabbitMq connection string is needed
        public RabbitMqConnectionSettings RabbitMqConnectionSettings { get; set; }
    }
    QueueBrokerType:
    public enum QueueBrokerType
    {
        None,
        Kafka,
        RabbitMq
    }
    Property Type Description
    Name string Connection string name
    BrokerType QueueBrokerType Set to QueueBrokerType.RabbitMq for a Kafka connection string
    RabbitMqConnectionSettings RabbitMqConnectionSettings A single string that specifies the RabbitMQ exchange connection details

Add a RabbitMQ ETL Task

To create the ETL task:

  • Prepare a QueueEtlConfigurationobject with the ETL task configuration.
  • Pass this object to the AddEtlOperation store operation to add the ETL task.

Code Sample:

// use PutConnectionStringOperation to add connection string
var res = store.Maintenance.Send(
    new PutConnectionStringOperation<QueueConnectionString>(
        new QueueConnectionString
        {
            Name = "RabbitMqConStr",
            BrokerType = QueueBrokerType.RabbitMq,
            RabbitMqConnectionSettings = new RabbitMqConnectionSettings() { ConnectionString = "amqp://guest:guest@localhost:49154" }
        }));

// create transformation script
Transformation transformation = new Transformation
{
    Name = "scriptName",
    Collections = { "Orders" },
    Script = @"var orderData = {
                Id: id(this), 
                OrderLinesCount: this.Lines.length,
                TotalCost: 0
            };

            for (var i = 0; i < this.Lines.length; i++) {
                var line = this.Lines[i];
                var cost = (line.Quantity * line.PricePerUnit) * ( 1 - line.Discount);
                orderData.TotalCost += cost;
            }

            loadToOrders(orderData, `routingKey`, {  
                Id: id(this),
                PartitionKey: id(this),
                Type: 'special-promotion',
                Source: '/promotion-campaigns/summer-sale'
            });",
    ApplyToAllDocuments = false
};

// use AddEtlOperation to add ETL task 
AddEtlOperation<QueueConnectionString> operation = new AddEtlOperation<QueueConnectionString>(
new QueueEtlConfiguration()
{
    Name = "RabbitMqEtlTaskName",
    ConnectionStringName = "RabbitMqConStr",
    Transforms =
        {
            transformation
        },
    Queues = { new EtlQueue() { Name = "Orders" } },
    BrokerType = QueueBrokerType.RabbitMq,
    SkipAutomaticQueueDeclaration = false,

    // Do not prevent a failover to another node
    PinToMentorNode = false
});
store.Maintenance.Send(operation);
  • QueueEtlConfiguration:

    Property Type Description
    Name string The ETL task name
    ConnectionStringName string The registered connection string name
    Transforms List<Transformation>[] You transformation script
    Queues List<EtlQueue> Optional actions to take when a document is processed, see Delete Processed Documents below.
    BrokerType QueueBrokerType Set to QueueBrokerType.RabbitMq to define a RabbitMQ ETL task
    SkipAutomaticQueueDeclaration bool Set to true to skip automatic queue declaration
    Use this option when you prefer to define Exchanges, Queues & Bindings manually.

By default we define exchanges on our own with Fanout type so the routing keys are ignored.


Delete Processed Documents

You can include an optional EtlQueue property in the ETL configuration to trigger additional actions.
An action that you can trigger this way, is the deletion of RavenDB documents once they've been processed by the ETL task.

EtlQueue

public class EtlQueue
{
    public string Name { get; set; }
    public bool DeleteProcessedDocuments { get; set; }
}
Property Type Description
Name string Queue name
DeleteProcessedDocuments bool if true, delete processed documents from RavenDB

Code Sample:

new QueueEtlConfiguration()
{
    Name = "RabbitMqEtlTaskName",
    ConnectionStringName = "RabbitMqConStr",
    Transforms =
        {
            transformation
        },
    // Only define if you want to delete documents from RavenDB after they are processed.
    Queues =
        new List<EtlQueue>()
        {
            new()
            {
                // Documents that were processed by the transformation script will be
                // deleted from RavenDB after the message is loaded to the Orders queue.
                Name = "Orders",
                DeleteProcessedDocuments = true
            }
        },
    BrokerType = QueueBrokerType.RabbitMq,
    SkipAutomaticQueueDeclaration = false
});