Queue ETL: Azure Queue Storage
-
Azure Queue Storage is a Microsoft Azure service that allows for the storage and retrieval of large numbers of messages, enabling communication between applications by allowing them to asynchronously send and receive messages. Each message in a queue can be up to 64 KB in size, and a queue can contain millions of messages, providing a robust and scalable solution for data processing.
-
Create an Azure Queue Storage ETL Task to:
- Extract data from a RavenDB database
- Transform the data using one or more custom scripts
- Load the resulting JSON object to an Azure Queue destination as a CloudEvents message
-
Utilizing this task allows RavenDB to act as an event producer in an Azure Queue architecture.
-
Azure Functions can be triggered to consume and process messages that are sent to Azure queues,
enabling powerful and flexible workflows. The message visibility period and life span in the Queue can be customized through these ETL configuration options. -
Read more about Azure Queue Storage in the platform's official documentation.
-
This article focuses on how to create an Azure Queue Storage ETL task using the Client API.
To define an Azure Queue Storage ETL task from the Studio, see Studio: Azure Queue Storage ETL Task.
For an overview of Queue ETL tasks, see Queue ETL tasks overview. -
In this page:
Add an Azure Queue Storage connection string
Prior to setting up the ETL task, define a connection string that the task will use to access your Azure account.
The connection string includes the authorization credentials required to connect.
Authentication methods:
There are three authentication methods available:
-
Connection string
- Provide a single string that includes all the options required to connect to your Azure account.
Learn more about Azure Storage connection strings here. -
Note: the following connection string parameters are mandatory:
AccountName
AccountKey
DefaultEndpointsProtocol
QueueEndpoint
(when using http protocol)
- Provide a single string that includes all the options required to connect to your Azure account.
-
Entra ID
- Use the Entra ID authorization method to achieve enhanced security by leveraging Microsoft Entra’s robust identity solutions.
- This approach minimizes the risks associated with exposed credentials commonly found in connection strings and enables more granular control through Role-Based Access Controls.
-
Passwordless
- This authorization method requires the machine to be pre-authorized and can only be used in self-hosted mode.
- Passwordless authorization works only when the account on the machine is assigned the Storage Account Queue Data Contributor role; the Contributor role alone is inadequate.
Example:
// Prepare the connection string:
// ==============================
var conStr = new QueueConnectionString
{
// Provide a name for this connection string
Name = "myAzureQueueConStr",
// Set the broker type
BrokerType = QueueBrokerType.AzureQueueStorage,
// In this example we provide a simple string for the connection string
AzureQueueStorageConnectionSettings = new AzureQueueStorageConnectionSettings()
{
ConnectionString = @"DefaultEndpointsProtocol=https;
AccountName=myAccountName;
AccountKey=myAccountKey;
EndpointSuffix=core.windows.net"
}
};
// Deploy (send) the connection string to the server via the PutConnectionStringOperation:
// =======================================================================================
var res = store.Maintenance.Send(
new PutConnectionStringOperation<QueueConnectionString>(conStr));
Syntax:
public class QueueConnectionString : ConnectionString
{
// Set the broker type to QueueBrokerType.AzureQueueStorage
// for an Azure Queue Storage connection string
public QueueBrokerType BrokerType { get; set; }
// Configure this when setting a connection string for Kafka
public KafkaConnectionSettings KafkaConnectionSettings { get; set; }
// Configure this when setting a connection string for RabbitMQ
public RabbitMqConnectionSettings RabbitMqConnectionSettings { get; set; }
// Configure this when setting a connection string for Azure Queue Storage
public AzureQueueStorageConnectionSettings AzureQueueStorageConnectionSettings { get; set; }
}
public enum QueueBrokerType
{
None,
Kafka,
RabbitMq,
AzureQueueStorage
}
public class AzureQueueStorageConnectionSettings
{
public EntraId EntraId { get; set; }
public string ConnectionString { get; set; }
public Passwordless Passwordless { get; set; }
}
public class EntraId
{
public string StorageAccountName { get; set; }
public string TenantId { get; set; }
public string ClientId { get; set; }
public string ClientSecret { get; set; }
}
public class Passwordless
{
public string StorageAccountName { get; set; }
}
Add an Azure Queue Storage ETL task
-
In this example, the Azure Queue Storage ETL Task will -
- Extract source documents from the "Orders" collection in RavenDB.
- Process each "Order" document using a defined script that creates a new
orderData
object. - Load the
orderData
object to the "OrdersQueue" in an Azure Queue Storage.
- For more details about the script and the
loadTo
method, see the transromation script section below.
// Define a transformation script for the task:
// ============================================
Transformation transformation = new Transformation
{
// Define the input collections
Collections = { "Orders" },
ApplyToAllDocuments = false,
// The transformation script
Name = "scriptName",
Script = @"// Create an orderData object
// ==========================
var orderData = {
Id: id(this),
OrderLinesCount: this.Lines.length,
TotalCost: 0
};
// Update the orderData's TotalCost field
// ======================================
for (var i = 0; i < this.Lines.length; i++) {
var line = this.Lines[i];
var cost = (line.Quantity * line.PricePerUnit) * ( 1 - line.Discount);
orderData.TotalCost += cost;
}
// Load the object to the 'OrdersQueue' in Azure
// =============================================
loadToOrdersQueue(orderData, {
Id: id(this),
Type: 'com.example.promotions',
Source: '/promotion-campaigns/summer-sale'
});"
};
// Define the Azure Queue Storage ETL task:
// ========================================
var etlTask = new QueueEtlConfiguration()
{
BrokerType = QueueBrokerType.AzureQueueStorage,
Name = "myAzureQueueEtlTaskName",
ConnectionStringName = "myAzureQueueConStr",
Transforms = { transformation },
// Set to false to allow task failover to another node if current one is down
PinToMentorNode = false
};
// Deploy (send) the task to the server via the AddEtlOperation:
// =============================================================
store.Maintenance.Send(new AddEtlOperation<QueueConnectionString>(etlTask));
-
You have the option to delete documents from your RavenDB database once they have been processed by the Queue ETL task.
-
Set the optional
Queues
property in your ETL configuration with the list of Azure queues for which processed documents should be deleted.
var etlTask = new QueueEtlConfiguration()
{
BrokerType = QueueBrokerType.AzureQueueStorage,
Name = "myAzureQueueEtlTaskName",
ConnectionStringName = "myAzureQueueConStr",
Transforms = { transformation },
// Define whether to delete documents from RavenDB after they are sent to the target queue
Queues = new List<EtlQueue>()
{
new()
{
// The name of the Azure queue
Name = "OrdersQueue",
// When set to 'true',
// documents that were processed by the transformation script will be deleted
// from RavenDB after the message is loaded to the "OrdersQueue" in Azure.
DeleteProcessedDocuments = true
}
}
};
store.Maintenance.Send(new AddEtlOperation<QueueConnectionString>(etlTask));
Syntax
public class QueueEtlConfiguration
{
// Set to QueueBrokerType.AzureQueueStorage to define an Azure Queue Storage ETL task
public QueueBrokerType BrokerType { get; set; }
// The ETL task name
public string Name { get; set; }
// The registered connection string name
public string ConnectionStringName { get; set; }
// List of transformation scripts
public List<Transformation> Transforms { get; set; }
// Optional configuration per queue
public List<EtlQueue> Queues { get; set; }
// Set to 'false' to allow task failover to another node if current one is down
public bool PinToMentorNode { get; set; }
}
public class Transformation
{
// The script name
public string Name { get; set; }
// The source RavenDB collections that serve as the input for the script
public List<string> Collections { get; set; }
// Set whether to apply the script on all collections
public bool ApplyToAllDocuments { get; set; }
// The script itself
public string Script { get; set; }
}
public class EtlQueue
{
// The Azure queue name
public string Name { get; set; }
// Delete processed documents when set to 'true'
public bool DeleteProcessedDocuments { get; set; }
}
The transformation script
The basic characteristics of an Azure Queue Storage ETL script are similar to those of other ETL types.
The script defines what data to extract from the source document, how to transform this data,
and which Azure Queue to load it to.
The loadTo method
To specify which Azure queue to load the data into, use either of the following methods in your script.
The two methods are equivalent, offering alternative syntax:
-
loadTo<QueueName>(obj, {attributes})
- Here the target is specified as part of the function name.
- The target <QueueName> in this syntax is Not a variable and cannot be used as one,
it is simply a string literal of the target's name.
-
loadTo('QueueName', obj, {attributes})
- Here the target is passed as an argument to the method.
- Separating the target name from the
loadTo
command makes it possible to include symbols like'-'
and'.'
in target names. This is not possible when theloadTo<QueueName>
syntax is used because including special characters in the name of a JavaScript function makes it invalid.
Parameter | Type | Description |
---|---|---|
QueueName | string | The name of the Azure Queue |
obj | object | The object to transfer |
attributes | object | An object with optional & required CloudEvents attributes |
For example, the following two calls, which load data to "OrdersQueue", are equivalent:
loadToOrdersQueue(obj, {attributes})
loadTo('OrdersQueue', obj, {attributes})
The following is a sample script that processes documents from the Orders collection:
// Create an orderData object
// ==========================
var orderData = {
Id: id(this),
OrderLinesCount: this.Lines.length,
TotalCost: 0
};
// Update the orderData's TotalCost field
// ======================================
for (var i = 0; i < this.Lines.length; i++) {
var line = this.Lines[i];
var cost = (line.Quantity * line.PricePerUnit) * ( 1 - line.Discount);
orderData.TotalCost += cost;
}
// Load the object to the "OrdersQueue" in Azure
// =============================================
loadToOrdersQueue(orderData, {
Id: id(this),
Type: 'com.example.promotions',
Source: '/promotion-campaigns/summer-sale'
})
Note:
The queue name defined in the transform script must follow the set of rules outlined in:
Naming Queues and Metadata.