Data Subscriptions: Concurrent Subscriptions
- With Concurrent Subscriptions, multiple data subscription workers are able to connect a common subscription task simultaneously.
- Different concurrent workers are given different document batches to process.
- With multiple concurrent workers that process different document batches in parallel, the consumption of a subscription's contents can be greatly accelerated.
-
Documents that were assigned to workers whose connection has ended unexpectedly, can be reassigned by the server to available workers.
-
In this page:
Defining Concurrent Workers
Concurrent workers are defined similarly to other workers, except for their strategy that is set as SubscriptionOpeningStrategy.Concurrent.
-
To define a concurrent worker:
- Create the worker using GetSubscriptionWorker.
- Pass it a SubscriptionWorkerOptions instance.
- Set the strategy to
SubscriptionOpeningStrategy.Concurrent
.
-
Usage:
- Define two concurrent workers
// Define concurrent subscription workers var subscriptionWorker1 = store.Subscriptions.GetSubscriptionWorker<Order>( // Set the worker to connect to the "All Orders" subscription task new SubscriptionWorkerOptions("All Orders") { // Set Concurrent strategy Strategy = SubscriptionOpeningStrategy.Concurrent, MaxDocsPerBatch = 20 }); var subscriptionWorker2 = store.Subscriptions.GetSubscriptionWorker<Order>( new SubscriptionWorkerOptions("All Orders") { Strategy = SubscriptionOpeningStrategy.Concurrent, MaxDocsPerBatch = 20 });
- Run both workers
// Start the concurrent worker. Workers will connect concurrently to the "All Orders" subscription task. var subscriptionRuntimeTask1 = subscriptionWorker1.Run(batch => { // process batch foreach (var item in batch.Items) { // process item } }); var subscriptionRuntimeTask2 = subscriptionWorker2.Run(batch => { // process batch foreach (var item in batch.Items) { // process item } });
- Define two concurrent workers
Dropping a Connection
-
Use
Subscriptions.DropSubscriptionWorker
to forcefully disconnect any concurrent worker from the subscription it is connected to.
public void DropSubscriptionWorker<T>(SubscriptionWorker<T> worker, string database = null)
-
Usage:
//drop a concurrent subscription worker store.Subscriptions.DropSubscriptionWorker(subscriptionWorker2);
Connection Failure
- When a concurrent worker's connection ends unexpectedly, the server may assign the documents this worker has been processing to any other concurrent worker that is available.
- A worker that reconnects after a connection failure will be assigned
a new batch of documents.
It is not guaranteed that the new batch will contain the same documents this worker had been processing before disconnecting. -
It may therefore happen that documents will be processed more
than once:
- First by a worker that disconnected unexpectedly, without acknowledging the procession of its documents,
- and then by workers the documents have been reassigned to.