How to Consume a Data Subscription
-
Batches of documents sent from a Subscription Task defined on the server are consumed and processed by a subscription worker client.
-
The
SubscriptionWorker
object, defined on the client, manages the communication between the server and the client and processes the document batches sent from the server. -
There are several ways to create and configure the SubscriptionWorker - see SubscriptionWorkerOptions.
-
In this page:
SubscriptionWorker lifecycle
A SubscriptionWorker
object starts its life from being generated by the DocumentsStore.Subscriptions
:
subscriptionWorker = store.Subscriptions.GetSubscriptionWorker<Order>(subscriptionName);
At this point, the worker has only got its configuration. No connection or processing happens at this moment.
To start processing, the Run
method should be called. The Run method receives the batch processing logic that should be performed:
subscriptionRuntimeTask = subscriptionWorker.Run(batch =>
{
// your logic here
});
From this point on, the subscription worker will start processing batches.
If processing is aborted for any reason, the returned task (subscriptionRuntimeTask
) will complete with an exception.
Error handling
Subscription worker connection failures
Subscription worker connection failures may occur during the routine communication between the worker and the server. When an unexpected error arises, the worker will attempt to reconnect to the server.
However, there are several conditions under which the worker will stop its operation but will Not attempt to reconnect:
- The subscription no longer exists or has been deleted.
- Another worker has taken control of the subscription (see connection strategy).
- The worker is unable to connect to any of the servers.
- The worker could not receive the node responsible for the task
(this can happen when there is no leader in the cluster). - An authorization exception occurred.
- An exception occurred during the connection establishment phase.
- The database doesn't exist.
Batch processing execution failures
An exception may occur while processing a batch of documents in the worker. For example:
_ = workerWBatch.Run(x => throw new Exception());
When creating a worker, the worker can be configured to handle these exceptions in either of the following ways,
depending on the IgnoreSubscriberErrors
property in SubscriptionWorkerOptions:
-
Abort processing completely
WhenIgnoreSubscriberErrors = false
(default):
The current batch processing will be aborted, and in this case, the worker will wrap the thrown exception in aSubscriberErrorException
and will rethrow it. Processing of the subscription will be terminated without acknowledging progress to the server or retrying to connect.
As a result, the task returned by theRun
function will complete in an erroneous state, throwing a SubscriberErrorException. -
Continue processing subsequent batches
WhenIgnoreSubscriberErrors = true
:
The current batch processing will be aborted; however, the erroneous batch will be acknowledged without retrying, and processing will continue with the next batches.
Reconnecting
Two properties in the SubscriptionWorkerOptions object control the behavior of a worker attempting to reconnect with the server:
TimeToWaitBeforeConnectionRetry
The time the worker will wait before attempting to reconnect.
Default: 5 seconds.MaxErroneousPeriod
The maximum amount of time the subscription connection can remain in an erroneous state.
Once this period is exceeded, the worker will stop trying to reconnect.
Default: 5 minutes.
Timing out
A worker will time out after losing its connectivity with the server for a given time period.
- The timeout period can be set using the
ConnectionStreamTimeout
option. E.g.:
var options = new SubscriptionWorkerOptions(subscriptionName); // Set the worker's timeout period options.ConnectionStreamTimeout = TimeSpan.FromSeconds(45);
- Default timeout period: 30 second
OnUnexpectedSubscriptionError
OnUnexpectedSubscriptionError
is the event that is triggered when a connection failure occurs between the subscription worker and the server,
resulting in an unexpected exception.
When this happens, the worker will automatically attempt to reconnect.
This event is useful for logging these unexpected exceptions.
Worker strategies
Subscription workers are configured with a strategy that determines whether multiple workers can connect to the subscription concurrently or if only one worker can connect at a time.
The one-worker-at-a-time strategy also determines how the workers interact with each other to resolve which will establish the subscription connection.
One worker per subscription strategies
The following three strategies allow only a single worker to connect to the subscription at any given time,
and determine what happens when one worker is connected and another tries to connect.
SubscriptionOpeningStrategy.OpenIfFree
The server will allow a worker to connect only if no other worker is currently connected.
If there is an existing connection, the incoming worker will throw aSubscriptionInUseException
.SubscriptionOpeningStrategy.WaitForFree
If the worker cannot open the subscription because it is in use by another worker, it will wait for the currently connected worker to disconnect before establishing the connection. This is useful in worker failover scenarios, where one worker is connected while another is awaiting its turn to take its place.-
SubscriptionOpeningStrategy.TakeOver
The server will allow an incoming connection to take over an existing one,
based on the connection strategy in use by the currently connected worker:- If the existing connection does not have a
TakeOver
strategy:
The incoming connection will take over, causing the existing connection to throw aSubscriptionInUseException
. - If the existing connection has a
TakeOver
strategy:
The incoming connection will throw aSubscriptionInUseException
exception.
- If the existing connection does not have a
Multiple workers per subscription strategy
SubscriptionOpeningStrategy.Concurrent
The server allows multiple workers to connect to the same subscription concurrently.
Read more about concurrent subscriptions here.
Determining which workers a subscription will serve
The strategy used by the first worker connecting to a subscription determines which additional workers the subscription can serve until all worker connections are dropped.
-
A subscription that serves one or more concurrent workers, can only serve other concurrent workers until all connections are dropped. If a worker with a one worker per subscription strategy attempts to connect -
- The connection attempt will be rejected.
SubscriptionInUseException
will be thrown.
-
A subscription that serves a worker with a one worker per subscription strategy, cannot serve concurrent workers until that worker's connection is dropped. If a concurrent worker attempts to connect -
- The connection attempt will be rejected.
SubscriptionInUseException
will be thrown.