Data Subscriptions: Concurrent Subscriptions


  • With Concurrent Subscriptions, multiple data subscription workers are able to connect a common subscription task simultaneously.
  • Different concurrent workers are given different document batches to process.
  • With multiple concurrent workers that process different document batches in parallel, the consumption of a subscription's contents can be greatly accelerated.
  • Documents that were assigned to workers whose connection has ended unexpectedly, can be reassigned by the server to available workers.

  • In this page:


Defining Concurrent Workers

Concurrent workers are defined similarly to other workers, except for their strategy that is set as SubscriptionOpeningStrategy.Concurrent.

  • To define a concurrent worker:

  • Usage:

    • Define two concurrent workers
      // Define concurrent subscription workers
      var subscriptionWorker1 = store.Subscriptions.GetSubscriptionWorker<Order>(
          // Set the worker to connect to the "All Orders" subscription task
          new SubscriptionWorkerOptions("All Orders")
          {
              // Set Concurrent strategy
              Strategy = SubscriptionOpeningStrategy.Concurrent,
              MaxDocsPerBatch = 20
          });
      
      var subscriptionWorker2 = store.Subscriptions.GetSubscriptionWorker<Order>(
          new SubscriptionWorkerOptions("All Orders")
          {
              Strategy = SubscriptionOpeningStrategy.Concurrent,
              MaxDocsPerBatch = 20
          });
    • Run both workers
      // Start the concurrent worker. Workers will connect concurrently to the "All Orders" subscription task.
      var subscriptionRuntimeTask1 = subscriptionWorker1.Run(batch =>
      {
          // process batch
          foreach (var item in batch.Items)
          {
              // process item
          }
      });
      
      var subscriptionRuntimeTask2 = subscriptionWorker2.Run(batch =>
      {
          // process batch
          foreach (var item in batch.Items)
          {
              // process item
          }
      });

Dropping a Connection

  • Use Subscriptions.DropSubscriptionWorker to forcefully disconnect any concurrent worker from the subscription it is connected to.

    public void DropSubscriptionWorker<T>(SubscriptionWorker<T> worker, string database = null)
  • Usage:

    //drop a concurrent subscription worker
    store.Subscriptions.DropSubscriptionWorker(subscriptionWorker2);

Connection Failure

  • When a concurrent worker's connection ends unexpectedly, the server may assign the documents this worker has been processing to any other concurrent worker that is available.
  • A worker that reconnects after a connection failure will be assigned a new batch of documents.
    It is not guaranteed that the new batch will contain the same documents this worker had been processing before disconnecting.
  • It may therefore happen that documents will be processed more than once:
    • First by a worker that disconnected unexpectedly, without acknowledging the procession of its documents,
    • and then by workers the documents have been reassigned to.