Queue ETL: RabbitMQ


  • RabbitMQ exchanges are designed to disperse data to multiple queues,
    creating a flexible data channeling system that can easily handle complex message streaming scenarios.

  • Create a RabbitMQ ETL Task to:

    • Extract data from a RavenDB database
    • Transform the data using one or more custom scripts
    • Load the resulting JSON object to a RabbitMQ destination as a CloudEvents message
  • Utilizing this task allows RavenDB to act as an event producer in a RabbitMQ architecture.

  • Read more about RabbitMQ in the platform's official documentation.



Add a RabbitMQ connection string

Before setting up the ETL task, define a connection string that the task will use to connect to RabbitMQ.


Example

// Prepare the connection string:
// ==============================
var conStr = new QueueConnectionString
{
    // Provide a name for this connection string
    Name = "myRabbitMqConStr",
    
    // Set the broker type
    BrokerType = QueueBrokerType.RabbitMq,
    
    // Configure the connection details
    RabbitMqConnectionSettings = new RabbitMqConnectionSettings() 
        { ConnectionString = "amqp://guest:guest@localhost:49154" }
};

// Deploy (send) the connection string to the server via the PutConnectionStringOperation:
// =======================================================================================
var res = store.Maintenance.Send(
    new PutConnectionStringOperation<QueueConnectionString>(conStr));

Syntax

public class QueueConnectionString : ConnectionString
{
    // Set the broker type to QueueBrokerType.RabbitMq for a RabbitMQ connection string
    public QueueBrokerType BrokerType { get; set; }
    
    // Configure this when setting a connection string for Kafka
    public KafkaConnectionSettings KafkaConnectionSettings { get; set; }
    
    // Configure this when setting a connection string for RabbitMQ
    public RabbitMqConnectionSettings RabbitMqConnectionSettings { get; set; }
    
    // Configure this when setting a connection string for Azure Queue Storage
    public AzureQueueStorageConnectionSettings AzureQueueStorageConnectionSettings { get; set; }
}
public enum QueueBrokerType
{
    None,
    Kafka,
    RabbitMq,
    AzureQueueStorage
}
public sealed class RabbitMqConnectionSettings
{
    // A single string that specifies the RabbitMQ exchange connection details
    public string ConnectionString { get; set; }
}

Add a RabbitMQ ETL task

Example - basic:


  • In this example, the RabbitMQ ETL Task will -
    • Extract source documents from the "Orders" collection in RavenDB.
    • Process each "Order" document using a defined script that creates a new orderData object.
    • Load the orderData object to the "OrdersExchange" in a RabbitMQ broker.
  • For more details about the script and the loadTo method overloads, see the transromation script section below.

// Define a transformation script for the task: 
// ============================================
Transformation transformation = new Transformation
{
    // Define the input collections
    Collections = { "Orders" },
    ApplyToAllDocuments = false,
    
    // The transformation script
    Name = "scriptName",
    Script = @"// Create an orderData object
               // ==========================
               var orderData = {
                   Id: id(this), 
                   OrderLinesCount: this.Lines.length,
                   TotalCost: 0
               };

               // Update the orderData's TotalCost field
               // ======================================
               for (var i = 0; i < this.Lines.length; i++) {
                   var line = this.Lines[i];
                   var cost = (line.Quantity * line.PricePerUnit) * ( 1 - line.Discount);
                   orderData.TotalCost += cost;
               }

               // Load the object to the 'OrdersExchange' in RabbitMQ
               // =================================================== 
               loadToOrdersExchange(orderData, `routingKey`, {  
                   Id: id(this),
                   Type: 'com.example.promotions',
                   Source: '/promotion-campaigns/summer-sale'
               });"
};

// Define the RabbitMQ ETL task:
// =============================
var etlTask = new QueueEtlConfiguration()
{
    BrokerType = QueueBrokerType.RabbitMq,
    
    Name = "myRabbitMqEtlTaskName",
    ConnectionStringName = "myRabbitMqConStr",
    
    Transforms = { transformation },

    // Set to false to have the RabbitMQ client library declare the queue if does not exist
    SkipAutomaticQueueDeclaration = false,
    
    // Set to false to allow task failover to another node if current one is down
    PinToMentorNode = false
};

// Deploy (send) the task to the server via the AddEtlOperation:
// =============================================================
store.Maintenance.Send(new AddEtlOperation<QueueConnectionString>(etlTask));

Example - delete processed documents:


  • You have the option to delete documents from your RavenDB database once they have been processed by the Queue ETL task.

  • Set the optional Queues property in your ETL configuration with the list of RabbitMQ queues for which processed documents should be deleted.

var etlTask = new QueueEtlConfiguration()
{
    BrokerType = QueueBrokerType.RabbitMq,
    
    Name = "myRabbitMqEtlTaskName",
    ConnectionStringName = "myRabbitMqConStr",
    
    Transforms = { transformation },

    // Define whether to delete documents from RavenDB after they are sent to RabbitMQ
    Queues = new List<EtlQueue>()
    {
        new()
        {
            // The name of the target queue
            Name = "OrdersQueue",

            // When set to 'true',
            // documents that were processed by the transformation script will be deleted
            // from RavenDB after the message is loaded to the "OrdersQueue" in RabbitMQ.
            DeleteProcessedDocuments = true
        }
    }
}; 

store.Maintenance.Send(new AddEtlOperation<QueueConnectionString>(etlTask));


Syntax

public class QueueEtlConfiguration
{
    // Set to QueueBrokerType.RabbitMq to define a RabbitMQ ETL task
    public QueueBrokerType BrokerType { get; set; }
    // The ETL task name
    public string Name { get; set; }
    // The registered connection string name
    public string ConnectionStringName { get; set; }
    // List of transformation scripts
    public List<Transformation> Transforms { get; set; }
    // Optional configuration per queue
    public List<EtlQueue> Queues { get; set; }
    // Set to 'false' to allow task failover to another node if current one is down
    public bool PinToMentorNode { get; set; }
    
    // Set to 'false' to have the RabbitMQ client library declare the queue if does not exist.
    // Set to 'true' to skip automatic queue declaration, 
    // use this option when you prefer to define Exchanges, Queues & Bindings manually.
    public bool SkipAutomaticQueueDeclaration { get; set; }
}

public class Transformation
{
    // The script name
    public string Name { get; set; }
    // The source RavenDB collections that serve as the input for the script
    public List<string> Collections { get; set; }
    // Set whether to apply the script on all collections
    public bool ApplyToAllDocuments { get; set; }
    // The script itself
    public string Script { get; set; }
}

public class EtlQueue
{
    // The RabbitMQ target queue name
    public string Name { get; set; }
    // Delete processed documents when set to 'true'
    public bool DeleteProcessedDocuments { get; set; }
}

The transformation script

The basic characteristics of a RabbitMQ ETL script are similar to those of other ETL types.
The script defines what data to extract from the source document, how to transform this data,
and which RabbitMQ Exchange to load it to.


The loadTo method

To specify which RabbitMQ Exchange to load the data into, use either of the following methods in your script.
The two methods are equivalent, offering alternative syntax:

  • loadTo<ExchangeName>(obj, 'routingKey', {attributes})

    • Here the target is specified as part of the function name.
    • The target <ExchangeName> in this syntax is Not a variable and cannot be used as one,
      it is simply a string literal of the target's name.
  • loadTo('ExchangeName', obj, 'routingKey', {attributes})

    • Here the target is passed as an argument to the method.
    • Separating the target name from the loadTo command makes it possible to include symbols like '-' and '.' in target names. This is not possible when the loadTo<ExchangeName> syntax is used because including special characters in the name of a JavaScript function makes it invalid.
    Parameter Type Description
    ExchangeName string The name of the RabbitMQ exchange.
    obj object The object to transfer.
    routingKey string The RabbitMQ exchange evaluates this attribute to determine how to route the message to queues based on the exchange type.
    attributes object An object with CloudEvents attributes.

For example, the following two calls, which load data to the Orders exchange, are equivalent:

  • loadToOrdersExchange(obj, 'users', {attributes})
  • loadTo('OrdersExchange', obj, 'users', {attributes})

Available method overloads

  • loadTo('', obj, 'routingKey', {attributes})
    When replacing the exchange name with an empty string,
    the message will be routed using the routingKey via the default exchange, which is predefined by the broker.

  • loadTo<ExchangeName>(obj)
    loadTo<ExchangeName>(obj, {attributes})
    When omitting the routingKey, messages delivery will depend on the exchange type.

  • loadTo<ExchangeName>(obj, 'routingKey')
    When omitting the attributes, default attribute values will be assigned.


If no exchange is defined in the RabbitMQ platform, RavenDB will create a default exchange of the Fanout type. In this case, all routing keys will be ignored, and messages will be distributed to all bound queues.


A sample script that process documents from the Orders collection:

// Create an orderData object
// ==========================
var orderData = {
    Id: id(this),
    OrderLinesCount: this.Lines.length,
    TotalCost: 0
};

// Update the orderData's TotalCost field
// ======================================
for (var i = 0; i < this.Lines.length; i++) {
    var line = this.Lines[i];
    var cost = (line.Quantity * line.PricePerUnit) * ( 1 - line.Discount);
    orderData.TotalCost += cost;
}

// Load the object to "OrdersExchange" in RabbitMQ
// ===============================================
loadToOrdersExchange(orderData, 'users-queue', {
    Id: id(this),
    Type: 'com.example.promotions',
    Source: '/promotion-campaigns/summer-sale'
})