Queue Sink: RabbitMQ
-
RabbitMQ brokers are designed to disperse data to multiple queues, making for a flexible data channeling system that can easily handle complex message streaming scenarios.
-
RavenDB can harness the advantages presented by RabbitMQ brokers both as a producer (by running ETL tasks) and as a consumer (using a sink task to consume enqueued messages).
-
To use RavenDB as a consumer, define an ongoing Sink task that will read batches of JSON formatted messages from RabbitMQ queues, construct documents using user-defined scripts, and store the documents in RavenDB collections.
-
In this page:
The RabbitMQ Sink Task
Users of RavenDB 6.0 and on can create an ongoing Sink task that connects a RabbitMQ broker, retrieves messages from selected queues, runs a user-defined script to manipulate data and construct documents, and potentially stores the created documents in RavenDB collections.
Connecting a RabbitMQ broker
In the message broker architecture, RavenDB sinks take the role of data consumers.
A sink would connect a RabbitMQ broker using a connection string, and retrieve messages
from the broker's queues.
Read below
about adding a connection string via API.
Read here
about adding a connection string using Studio.
Retrieving messages from RabbitMQ queues
When a message is sent to a RabbitMQ broker by a producer, it is pushed to the tail of a queue. As preceding messages are pulled, the message advances up the queue until it reaches its head and can be consumed by RavenDB's sink.
Running user-defined scripts
A sink task's script is a JavaScript segment. Its basic role is to retrieve selected RabbitMQ messages or message properties, and construct documents that will then be stored in RavenDB.
The script can simply store the whole message as a document, as in this
segment:
// Add the document a metadata `@collection` property to keep it in
// this collection, or do not set it to store the document in @empty).
this['@metadata']['@collection'] = 'Orders';
// Store the message as is, using its Id property as its RavenDB Id as well.
put(this.Id.toString(), this)
But the script can also retrieve some information from the read message
and construct a new document that doesn't resemble the original message.
Scripts often apply two sections: a section that creates a JSON object
that defines the document's structure and contents, and a second section
that stores the document.
E.g., for RabbitMQ messages of this format -
{
"Id" : 13,
"FirstName" : "John",
"LastName" : "Doe"
}
We can create this script -
var item = {
Id : this.Id,
FirstName : this.FirstName,
LastName : this.LastName,
FullName : this.FirstName + ' ' + this.LastName,
"@metadata" : {
"@collection" : "Users"
}
};
// Use .toString() to pass the Id as a string even if RabbitMQ provides it as a number
put(this.Id.toString(), item)
The script can also apply various other JavaScript commands, including
load
to load a RavenDB document (e.g. to construct a document that
includes data from the retrieved message and complementing data from
existing RavenDB documents), del
to remove existing RavenDB documents,
and many others.
Storing documents in RavenDB collections
The sink task consumes batches of queued messages and stores them in RavenDB
in a transactional manner, processing either the entire batch or none of it.
Exceptions to this rule
Some script processing errors are allowed; when such an error occurs RavenDB
will skip the affected message, record the event in the logs, and alert the
user in Studio, but continue processing the batch.
Once a batch is consumed, the task confirms it by sending _channel.BasicAck
.
Note that the number of documents included in a batch is configurable.
Take care of duplicates
Producers may enqueue
multiple instances
of the same document.
if processing each message only once is important to the consumer,
it is the consumer's responsibility to verify the uniqueness of
each consumed message.
Note that as long as the Id property of RabbitMQ messages is preserved
(so duplicate messages share an Id), the script's put(ID, { ... })
command
will overwrite a previous document with the same Id and only one copy of
it will remain.
Client API
Add a RabbitMQ Connection String
Prior to defining a RabbitMQ sink task, add a RabbitMQ connection string that the task will use to connect the message brokers.
To create the connection string:
-
Create a
QueueConnectionString
instance with the connection string configuration.
Pass it to thePutConnectionStringOperation
store operation to add the connection string.QueueConnectionString
:
// Add RabbitMQ connection string var res = store.Maintenance.Send( new PutConnectionStringOperation<QueueConnectionString>( new QueueConnectionString { Name = "RabbitMqConStr", BrokerType = QueueBrokerType.RabbitMq, RabbitMqConnectionSettings = new RabbitMqConnectionSettings() { ConnectionString = "amqp://guest:guest@localhost:5672/" } }));
QueueBrokerType
:
public enum QueueBrokerType { None, Kafka, RabbitMq }
Property Type Description Name string
Connection string name BrokerType QueueBrokerType
Set to QueueBrokerType.RabbitMq
for a RabbitMQ connection stringRabbitMqConnectionSettings RabbitMqConnectionSettings[]
A list of strings indicating RabbitMQ brokers connection details
Add a RabbitMQ Sink Task
To create the Sink task:
-
Create
QueueSinkScript
instances to define scripts with which the task can process retrieved messages, apply JavaScript commands, construct documents and store them in RavenDB.
// Define a Sink script QueueSinkScript queueSinkScript = new QueueSinkScript { // Script name Name = "orders", // A list of RabbitMQ queues to connect Queues = new List<string>() { "orders" }, // Apply this script Script = @"this['@metadata']['@collection'] = 'Orders'; put(this.Id.toString(), this)" };
-
Prepare a
QueueSinkConfiguration
object with the sink task configuration.QueueSinkConfiguration
properties:Property Type Description Name string
The sink task name ConnectionStringName string
The registered connection string name BrokerType QueueBrokerType
Set to QueueBrokerType.RabbitMq
to define a RabbitMQ sink taskScripts List<QueueSinkScript>
A list of scripts -
Pass this object to the
AddQueueSinkOperation
store operation to add the Sink task.QueueSinkScript
properties:Property Type Description Name string
Script name Queues List<string>
A list of RabbitMQ queues to consume messages from Script string
The script contents
Code Sample:
// Add Kafka connection string
var res = store.Maintenance.Send(
new PutConnectionStringOperation<QueueConnectionString>(
new QueueConnectionString
{
Name = "KafkaConStr",
BrokerType = QueueBrokerType.Kafka,
KafkaConnectionSettings = new KafkaConnectionSettings()
{ BootstrapServers = "localhost:9092" }
}));
// Define a Sink script
QueueSinkScript queueSinkScript = new QueueSinkScript
{
// Script name
Name = "orders",
// A list of Kafka topics to connect
Queues = new List<string>() { "orders" },
// Apply this script
Script = @"this['@metadata']['@collection'] = 'Orders';
put(this.Id.toString(), this)"
};
// Define a Kafka configuration
var config = new QueueSinkConfiguration()
{
// Sink name
Name = "KafkaSinkTaskName",
// The connection string to connect the broker with
ConnectionStringName = "KafkaConStr",
// What queue broker is this task using
BrokerType = QueueBrokerType.Kafka,
// The list of scripts to run
Scripts = { queueSinkScript }
};
AddQueueSinkOperationResult addQueueSinkOperationResult =
store.Maintenance.Send(new AddQueueSinkOperation<QueueConnectionString>(config));
Configuration Options
Use these configuration options to gain more control over queue sink tasks.
- QueueSink.MaxBatchSize
The maximum number of pulled messages consumed in a single batch. - QueueSink.MaxFallbackTimeInSec
The maximum number of seconds the Queue Sink process will be in a fallback mode (i.e. suspending the process) after a connection failure.