Data Subscriptions: Consumption API Overview



Create the subscription worker

A subscription worker can be created using the following getSubscriptionWorker methods available through the subscriptions property of the documentStore.

Note: Simply creating the worker is insufficient;
after creating the worker, you need to run the subscription worker to initiate document processing.

await documentStore.subscriptions.getSubscriptionWorker(subscriptionName);
await documentStore.subscriptions.getSubscriptionWorker(subscriptionName, database);

await documentStore.subscriptions.getSubscriptionWorker(options);
await documentStore.subscriptions.getSubscriptionWorker(options, database);
Parameter Type Description
subscriptionName string The name of the subscription to which the worker will connect.
database string The name of the database where the subscription task resides.
If null, the default database configured in DocumentStore will be used.
options object Subscription worker options object that affect how the worker interacts with the subscription. These options do not alter the definition of the subscription itself.
Return value
SubscriptionWorker The subscription worker that has been created.
The worker will start processing documents once you define the worker's on method,
which listens to the batch event.

Subscription worker options

// The SubscriptionWorkerOptions object:
// =====================================
{
    subscriptionName;
    documentType;
    ignoreSubscriberErrors;
    closeWhenNoDocsLeft;
    maxDocsPerBatch;
    timeToWaitBeforeConnectionRetry;
    maxErroneousPeriod;            
    strategy;
}

When creating a worker with subscription worker options, the only mandatory property is subscriptionName.
All other parameters are optional and will default to their respective default values if not specified.

Member Type Description
subscriptionName string The name of the subscription to which the worker will connect.
documentType object The class type of the subscription documents.
ignoreSubscriberErrors boolean Determines if subscription processing is aborted when the worker's batch-handling code throws an unhandled exception.

true – subscription processing will continue.

false (default) – subscription processing will be aborted.
closeWhenNoDocsLeft boolean Determines whether the subscription connection closes when no new documents are available.

true – The subscription worker processes all available documents and stops when none remain, at which point the SubscriptionClosedException will be thrown.
Useful for ad-hoc, one-time processing.

false (default) – The subscription worker remains active, waiting for new documents.
maxDocsPerBatch number The maximum number of documents that the server will try to retrieve and send to the client in a batch. If the server doesn't find as many documents as specified, it will send the documents it has found without waiting. Default: 4096.
timeToWaitBeforeConnectionRetry number The time (in ms) to wait before attempting to reconnect after a non-aborting failure during subscription processing. Default: 5 seconds.
maxErroneousPeriod number The maximum amount of time (in ms) a subscription connection can remain in an erroneous state before it is terminated. Default: 5 minutes.
strategy string The strategy configures how the server handles connection attempts from workers to a specific subscription task.

Available options:
OpenIfFree (default), TakeOver, WaitForFree, or Concurrent.

Learn more in worker strategies.

Run the subscription worker

After creating a subscription worker, the subscription worker is still not processing any documents.
To initiate processing, you need to define an event handler and attach it to the worker's batch event listener.

This handler contains your client-side code responsible for processing the document batches received from the server. Whenever a new batch of documents is ready, the provided handler will be triggered.

subscriptionWorker.on("batch", (batch, callback) => {
    // Process incoming items:
    // =======================
    
    // 'batch': 
    // Contains the documents to be processed.
    
    // callback(): 
    // Needs to be called after processing the batch 
    // to notify the worker that you're done processing.
});

Subscription batch

The subscription batch class contains the following public properties & methods:

Property Type Description
items Item[] List of items in the batch.
See subscription batch item.
Method Return type Description
getNumberOfItemsInBatch() number Get the number of items in the batch.
getNumberOfIncludes() number Get the number of included documents in the batch.
openSession() object Open a new document session that tracks all items and their included items within the current batch.
openSession(options) object Open a new document session - can pass session options.

Subscription worker connectivity

As long as there is no exception, the worker will continue addressing the same server that the first batch was received from.
If the worker fails to reach that node, it will try to failover to another node from the session's topology list.
The node that the worker succeeded connecting to, will inform the worker which node is currently responsible for data subscriptions.

Subscription batch item

This class represents a single item in a subscription batch result.

class Item
{
    result;
    exceptionMessage;
    id;
    changeVector;
    projection;
    revision;
    rawResult;
    rawMetadata;
    metadata;
}
Member Type Description
result object The current batch item.
exceptionMessage string The exception message thrown during current document processing in the server side.
id string The document ID of the underlying document for the current batch item.
changeVector string The change vector of the underlying document for the current batch item.
rawResult object Current batch item - no types reconstructed.
rawMetadata object Current batch item's underlying document metadata.
metadata object Current batch item's underlying metadata values.

Subscription worker

Methods
Method Return type Description
dispose() void Aborts subscription worker operation.
on() object Method used to set up event listeners & handlers.
getWorkerId() string Get the worker ID.

Events
Event Listener signature Description
"batch" (batch, callback) => void Emitted when a batch of documents is sent from the server to the client.

Once processing is done, callback must be called in order to continue batches' emission.
"afterAcknowledgment" (batch, callback) => void Emitted after each time the server acknowledges the progress of batch processing.
"connectionRetry" (error) => void Emitted when the worker attempts to reconnect to the server after a failure.
"error" (error) => void Emitted on subscription errors.
"end" (error) => void Emitted when subscription is finished.
No more batches are going to be emitted.

Properties
Member Type Description
currentNodeTag string The node tag of the current RavenDB server handling the subscription.
subscriptionName string The name of the currently processed subscription.