Queue ETL: Amazon SQS


  • Amazon SQS (Simple Queue Service) is a distributed Message Queue service (like Azure Queue Storage and others) that is widely used for its scalability, durability, availability, and queueing methods:

    • Standard queueing for enormous throughput.
    • FIFO queueing to control delivery order and prevent message duplication.
  • Create an Amazon SQS ETL Task to:

    • Extract data from a RavenDB database,
    • Transform the data using one or more custom scripts,
    • and Load the resulting JSON object to an SQS destination in CloudEvents messages format.

This article focuses on the creation of an Amazon SQS ETL task using the Client API.
To define an Amazon SQS ETL task from Studio, see Studio: Amazon SQS ETL Task.
For an overview of Queue ETL tasks, see Queue ETL tasks overview.


RavenDB ETL and Amazon SQS

  • Utilizing SQS ETL tasks allows RavenDB to take the role of an event producer in an Amazon SQS architecture, leveraging RavenDB's feature set and SQS` powerful message distribution capabilities.

  • The loading of RavenDB messages to an SQS queue can automatically trigger AWS Lambda Functions, enabling economic processing and powerful workflows.

    Enqueueing RavenDB messages using SQS can also be integrated with other AWS services such as Amazon SNS to distribute message-related notifications and Step Functions to manage and visualize your workflows.

Read more about Amazon SQS in the platform's official documentation.

Queue methods

The data that ETL tasks handle is carefully selected and tailored for specific user needs.
Selecting which Queue Type Amazon SQS would use should also take into account the specific nature of the transferred data.


Standard queueing

Standard queueing offers an extremely high transfer rate but lacks the ability to ensure that messages would arrive in the same order they were sent or prevent their duplication.

Use standard queueing when quick delivery takes precedence over messages order and distinctness or the recepient can make up for them.


FIFO queueing

FIFO queueing controls delivery order using a First-In-First-Out queue and ensures the delivery of each message exactly once, in exchange for a much slower transfer rate than that of the Standard Queueing method.

To load messages to a FIFO queue, add .fifo to the queue name while calling the transformation script's loadTo method:

Script = @"// Create an orderData object
           // ==========================
           var orderData = {
               Id: id(this), // property with RavenDB document ID
               OrderLinesCount: this.Lines.length,
               TotalCost: 0
           };

           for (var i = 0; i < this.Lines.length; i++) {
               var line = this.Lines[i];
               var cost = (line.Quantity * line.PricePerUnit) * ( 1 - line.Discount);
               orderData.TotalCost += cost;
           }

           // Load the object to the FIFO 'Orders' ququq on the SQS destination
           // ================================================================= 
           loadTo('orders.fifo', orderData, {
               Id: id(this),
               Type: 'com.github.users',
               Source: '/registrations/direct-signup'
           });"
  • Deduplication:
    FIFO queues automatically prevent duplicate messages within a deduplication interval.
    Interval default: 5 minutes
    Deduplication is achieved by giving each message a Message Deduplication ID with a unique Change Vector.
    If the change vector is longer than 128 characters (Amazon's restriction for a message deduplication ID) we will truncate the CV and add a -{9-digit hash}.

  • Message Grouping:
    Messages with the same Message Group ID are processed in order.
    Each group is handled independently, allowing parallel processing across different message groups.

Use this method when throughput is not as important as the order and uniqueness of arriving messages.


Caution: ETL message size -vs- Queue message size

Please be aware that the maximum size of an SQS queue message is 64 KB, while the maximum size of an ETL message to the queue is 256 KB.
The significance of this difference is that when a maximum-size ETL message arrives at its destination queue it may be charged for not 1 but 4 queue messages.

Add an Amazon SQS connection string

Prior to setting up the ETL task, define a connection string that the task will use to access your SQS destination.
The connection string includes the authorization credentials required to connect.


Authentication methods

The authorization method that the ETL task uses to access the SQS target is determined by properties of the connection string it uses, as shown in the example below.

Example

// Prepare the connection string:
// ==============================
var conStr = new QueueConnectionString
{
    // Provide a name for this connection string
    Name = "mySqsConStr", 
    
    // Set the broker type
    BrokerType = QueueBrokerType.AmazonSqs,

    AmazonSqsConnectionSettings = new AmazonSqsConnectionSettings()
    {
        // Define whether to use a password or not.
        // Set to `true` to authorize a dedicated machine that requires no password.
        // You can only use this option in self-hosted mode.
        Passwordless = false,

        // Sqs destination authorization parameters 
        Basic =
            {
                AccessKey = SqsAccessKey,
                SecretKey = SqsSecretKey,
                RegionName = SqsRegionName
            },
    }
};

// Deploy (send) the connection string to the server via the PutConnectionStringOperation:
// =======================================================================================
var res = store.Maintenance.Send(
    new PutConnectionStringOperation<QueueConnectionString>(conStr));
  • Passwordless
    Defines whether to use a password or not.
    Set this property to true if the target machine is pre-authorized.
    This authorization method can only be used in self-hosted mode.

  • Basic
    Defines these authorization properties:

    • AccessKey
    • SecretKey
    • RegionName

Syntax

public class AmazonSqsConnectionSettings
{
    public Basic Basic { get; set; }
    public bool Passwordless { get; set; }
}
public class Basic
{
    public string AccessKey { get; set; }
    public string SecretKey { get; set; }
    public string RegionName { get; set; }
}
public class QueueConnectionString : ConnectionString
{
    // Set to QueueBrokerType.AmazonSqs for an SQS connection string
    public QueueBrokerType BrokerType { get; set; }
    
    // Configure this when setting a connection string for Kafka
    public KafkaConnectionSettings KafkaConnectionSettings { get; set; }
    
    // Configure this when setting a connection string for RabbitMQ
    public RabbitMqConnectionSettings RabbitMqConnectionSettings { get; set; }

    // Configure this when setting a connection string for Azure Queue Storage
    public AzureQueueStorageConnectionSettings AzureQueueStorageConnectionSettings { get; set; }

    // Configure this when setting a connection string for Amazon SQS
    public AmazonSqsConnectionSettings AmazonSqsConnectionSettings { get; set; }
}
public enum QueueBrokerType
{
    None,
    Kafka,
    RabbitMq,
    AzureQueueStorage,
    AmazonSqs
}

Add an Amazon SQS ETL task

Example: Add SQS ETL task

  • In this example, the Amazon SQS ETL Task will -
    • Extract source documents from the "Orders" collection in RavenDB.
    • Process each "Order" document using a defined script that creates a new orderData object.
    • Load the orderData object to the "OrdersQueue" queue on an SQS destination.
  • For more details about the script and the loadTo method, see the transromation script section below.

// Define a transformation script for the task: 
// ============================================
Transformation transformation = new Transformation
{
    // Define the input collections
    Collections = { "Orders" },
    ApplyToAllDocuments = false,
    
    // The transformation script
    Name = "scriptName",
    Script = @"// Create an orderData object
               // ==========================
               var orderData = {
                   Id: id(this),
                   OrderLinesCount: this.Lines.length,
                   TotalCost: 0
               };

               // Update the orderData's TotalCost field
               // ======================================
               for (var i = 0; i < this.Lines.length; i++) {
                   var line = this.Lines[i];
                   var cost = (line.Quantity * line.PricePerUnit) * ( 1 - line.Discount);
                   orderData.TotalCost += cost;
               }

               // Load the object to the 'OrdersQueue' ququq on the SQS destination
               // ================================================================= 
               loadToOrdersQueue(orderData, {
                   Id: id(this),
                   Type: 'com.example.promotions',
                   Source: '/promotion-campaigns/summer-sale'
               });"
};

// Define the SQS ETL task
// =======================
var etlTask = new QueueEtlConfiguration()
{
    BrokerType = QueueBrokerType.AmazonSqs,
    
    Name = "myAmazonSqsEtlTaskName",
    ConnectionStringName = "myAmazonSqsConStr",
    
    Transforms = { transformation },
    
    // Set to false to allow task failover to another node if current one is down
    PinToMentorNode = false
}; 

// Deploy (send) the task to the server via the AddEtlOperation:
// =============================================================
store.Maintenance.Send(new AddEtlOperation<QueueConnectionString>(etlTask));

Delete processed documents

  • It is possible to delete documents from a RavenDB database once they have been processed by the Queue ETL task.
  • To do this, set the optional Queues property in the ETL configuration with the list of SQS queues for which processed documents are to be deleted.

var etlTask = new QueueEtlConfiguration()
{
    BrokerType = QueueBrokerType.AmazonSqs,
    
    Name = "myAmazonSqsEtlTaskName",
    ConnectionStringName = "myAmazonSqsConStr",
    
    Transforms = { transformation },

    // Define whether to delete documents from RavenDB after they are sent to the target queue
    Queues = new List<EtlQueue>()
    {
        new()
        {
            // The name of the SQS queue  
            Name = "OrdersQueue",

            // When set to 'true',
            // documents that were processed by the transformation script will be deleted
            // from RavenDB after the message is loaded to the target queue
            DeleteProcessedDocuments = true
        }
    }
}; 

store.Maintenance.Send(new AddEtlOperation<QueueConnectionString>(etlTask));

Syntax

public class QueueEtlConfiguration
{
    // Set to QueueBrokerType.AmazonSqs to define an SQS Queue ETL task
    public QueueBrokerType BrokerType { get; set; }
    // The ETL task name
    public string Name { get; set; }
    // The registered connection string name
    public string ConnectionStringName { get; set; }
    // List of transformation scripts
    public List<Transformation> Transforms { get; set; }
    // Optional configuration per queue
    public List<EtlQueue> Queues { get; set; }
    // Set to 'false' to allow task failover to another node if current one is down
    public bool PinToMentorNode { get; set; }
}

public class Transformation
{
    // The script name
    public string Name { get; set; }
    // The source RavenDB collections that serve as the input for the script
    public List<string> Collections { get; set; }
    // Set whether to apply the script on all collections
    public bool ApplyToAllDocuments { get; set; }
    // The script itself
    public string Script { get; set; }
}

public class EtlQueue
{
    // The SQS queue name
    public string Name { get; set; }
    // Delete processed documents when set to 'true'
    public bool DeleteProcessedDocuments { get; set; }
}

The transformation script

The basic characteristics of an Amazon SQS ETL script are similar to those of other ETL types.
The script defines what data to extract from the source document, how to transform this data,
and which SQS Queue to load the data to.


The loadTo method

To specify which SQS queue to load the data to, use either of the following methods in your script.
The two methods are equivalent, offering alternative syntax:

  • loadTo<QueueName>(obj, {attributes})

    loadToOrdersQueue(orderData, {
          Id: id(this),
          Type: 'com.example.promotions',
          Source: '/promotion-campaigns/summer-sale'
          }
    • Here the target is specified as part of the function name.
    • The target <QueueName> in this syntax is Not a variable and cannot be used as one,
      it is simply a string literal of the target's name.
  • loadTo('QueueName', obj, {attributes})

    loadTo('OrdersQueue', orderData, {
          Id: id(this),
          Type: 'com.example.promotions',
          Source: '/promotion-campaigns/summer-sale'
          }
    • Here the target is passed as an argument to the method.
    • Separating the target name from the loadTo command makes it possible to include symbols like '-' and '.' in target names.
      This is not possible when the loadTo<QueueName> syntax is used because including special characters in the name of a JavaScript function makes it invalid.
    • To deliver messages to a FIFO queue, use this format and add .fifo to the queue name.
      loadTo('QueueName.fifo', obj, {attributes})
Parameter Type Description
QueueName string The name of the SQS Queue
obj object The object to transfer
attributes object An object with optional & required CloudEvents attributes

For example, the following two calls, which load data to "OrdersQueue", are equivalent:

  • loadToOrdersQueue(obj, {attributes})
  • loadTo('OrdersQueue', obj, {attributes})

The following is a sample script that processes documents from the Orders collection:

// Create an orderData object
// ==========================
var orderData = {
    Id: id(this),
    OrderLinesCount: this.Lines.length,
    TotalCost: 0
};

// Update the orderData's TotalCost field
// ======================================
for (var i = 0; i < this.Lines.length; i++) {
    var line = this.Lines[i];
    var cost = (line.Quantity * line.PricePerUnit) * ( 1 - line.Discount);
    orderData.TotalCost += cost;
}

// Load the object to the "OrdersQueue" queue on the SQS destination
// =================================================================
loadToOrdersQueue(orderData, {
    Id: id(this),
    Type: 'com.example.promotions',
    Source: '/promotion-campaigns/summer-sale'
})