Concurrent Subscriptions
-
With Concurrent Subscriptions, multiple data subscription workers can connect to the same subscription task simultaneously.
-
Each worker is assigned a different batch of documents to process.
-
By processing different batches in parallel, multiple workers can significantly accelerate the consumption of the subscription's contents.
-
Documents that were assigned to workers whose connection has ended unexpectedly,
can be reassigned by the server to available workers. See connection failure below. -
In this page:
Defining concurrent workers
Concurrent workers are defined similarly to other workers, except their strategy is set to SubscriptionOpeningStrategy.Concurrent.
-
To define a concurrent worker:
- Create the worker using GetSubscriptionWorker.
- Pass it a SubscriptionWorkerOptions instance.
- Set the strategy to
SubscriptionOpeningStrategy.Concurrent
-
Usage:
- Define two concurrent workers
// Define concurrent subscription workers var worker1 = store.Subscriptions.GetSubscriptionWorker<Order>( // Set the worker to connect to the "Get all orders" subscription task new SubscriptionWorkerOptions("Get all orders") { // Set Concurrent strategy Strategy = SubscriptionOpeningStrategy.Concurrent, MaxDocsPerBatch = 20 }); var worker2 = store.Subscriptions.GetSubscriptionWorker<Order>( new SubscriptionWorkerOptions("Get all orders") { Strategy = SubscriptionOpeningStrategy.Concurrent, MaxDocsPerBatch = 20 });
- Run both workers
// Start the concurrent worker. // Workers will connect concurrently to the "Get all rders" subscription task. var worker1Task = worker1.Run(batch => { // Process batch foreach (var item in batch.Items) { // Process item } }); var worker2Task = worker2.Run(batch => { // Process batch foreach (var item in batch.Items) { // Process item } });
- Define two concurrent workers
Dropping a connection
-
Use
Subscriptions.DropSubscriptionWorker
to forcefully disconnect the specified worker from the subscription it is connected to.
public void DropSubscriptionWorker<T>(SubscriptionWorker<T> worker, string database = null)
-
Usage:
// Drop a concurrent subscription worker store.Subscriptions.DropSubscriptionWorker(worker2);
Connection failure
- When a concurrent worker's connection ends unexpectedly, the server may reassign the documents this worker has been processing to any other concurrent worker that is available.
- A worker that reconnects after a connection failure will be assigned a new batch of documents.
It is not guaranteed that the new batch will contain the same documents this worker was processing before the disconnection. -
As a result, documents may be processed more than once:
- first by a worker that disconnected unexpectedly without acknowledging the completion of its assigned documents,
- and later by other workers the documents are reassigned to.