Concurrent Subscriptions


  • With Concurrent Subscriptions, multiple data subscription workers can connect to the same subscription task simultaneously.

  • Each worker is assigned a different batch of documents to process.

  • By processing different batches in parallel, multiple workers can significantly accelerate the consumption of the subscription's contents.

  • Documents that were assigned to workers whose connection has ended unexpectedly,
    can be reassigned by the server to available workers. See connection failure below.

  • In this page:


Defining concurrent workers

Concurrent workers are defined similarly to other workers, except their strategy is set to Concurrent.

  • To define a concurrent worker:

  • Usage:

    • Define two concurrent workers
      // Define 2 concurrent subscription workers
      // ========================================
      
      const options = {
          // Set concurrent strategy
          strategy: "Concurrent", 
          subscriptionName: "Get all orders",
          maxDocsPerBatch: 20
      };
      
      const worker1 = documentStore.subscriptions.getSubscriptionWorker(options);
      const worker2 = documentStore.subscriptions.getSubscriptionWorker(options);
    • Run both workers
      worker1.on("batch", (batch, callback) => {
          try {
              for (const item of batch.items) {
                  // Process item
              }
              callback();
      
          } catch(err) {
              callback(err);
          }
      });
      
      worker2.on("batch", (batch, callback) => {
          try {
              for (const item of batch.items) {
                  // Process item
              }
              callback();
      
          } catch(err) {
              callback(err);
          }
      });

Dropping a connection

  • Use dropSubscriptionWorker to forcefully disconnect the specified worker from the subscription it is connected to.

  • Use dropConnection to disconnect ALL workers connected to the specified subscription.

// Drop connection for worker2
await documentStore.subscriptions.dropSubscriptionWorker(worker2);

// Available overloads:
dropConnection(options);
dropConnection(options, database);
dropSubscriptionWorker(worker);
dropSubscriptionWorker(worker, database);

Connection failure

  • When a concurrent worker's connection ends unexpectedly, the server may reassign the documents this worker has been processing to any other concurrent worker that is available.
  • A worker that reconnects after a connection failure will be assigned a new batch of documents.
    It is not guaranteed that the new batch will contain the same documents this worker was processing before the disconnection.
  • As a result, documents may be processed more than once:
    • first by a worker that disconnected unexpectedly without acknowledging the completion of its assigned documents,
    • and later by other workers the documents are reassigned to.