Data Subscriptions: How to Consume a Data Subscription
Subscriptions are consumed by processing batches of documents received from the server.
A SubscriptionWorker
object manages the documents processing and the communication between the client and the server according to a set of configurations received upon it's creation.
We've introduced several ways to create and configure a SubscriptionWorker, starting from just giving a subscription name, and ending with a detailed configuration object - SubscriptionWorkerOptions
.
In this page:
SubscriptionWorker lifecycle
Error handling
Workers interplay
SubscriptionWorker lifecycle
SubscriptionWorker
object starts its life from being generated by the DocumentsStore.subscriptions
:
const subscriptionWorker = store.subscriptions.getSubscriptionWorker({ subscriptionName });
The worker is going to connect to the server asynchronously, when a listener for the "batch"
event is registered using on()
method.
subscriptionWorker.on("batch", (batch, callback) => {
// your logic here
// report batch processing error passing it to callback
// callback(err)
callback();
});
subscriptionWorker.on("error", error => {
// handle errors
});
From this point on, the subscription worker will start processing batches. If for any reason, the processing is aborted an "error"
is going to be emitted with an Error
argument.
Error handling
Internal mechanism errors
Those errors occur during the normal server-client communication between the worker and the server (those would not be emitted via "error"
event).
If an unexpected error occurs, the worker will try to reconnect to the server. There are conditions in which the worker will cease its operation and will not try to reconnect:
-
The subscription does not exist or was deleted
-
Another worker took over the subscription (see connection strategy)
-
The worker could not connect to any of the servers
-
The worker could not receive the node in charge of the task (this can happen when there is no leader)
-
Authorization exception
-
Exception during connection establishment
User's batch processing logic unhandled exception
Example:
workerWBatch.on("batch", (batch, callback) => {
callback(new Error("Error during processing batch."));
});
If an exception is thrown, the worker will abort the current batch process. A worker can be configured to treat the thrown exception by either of the following two ways:
-
By default, the worker will wrap the thrown exception with a
SubscriberErrorException
exception and rethrow it,
terminating the subscription execution without acknowledging progress or retrying. An"error"
is going to be emitted. -
If
SubscriptionWorkerOptions
's valueignoreSubscriberErrors
is set to true, the erroneous batch will get acknowledged without retrying and the next batches will continue processing.
Reconnecting
In the cases above, we described situations in which a worker will try to reconnect with the server. There are two key SubscriptionWorkerOptions
fields controlling this state:
-
timeToWaitBeforeConnectionRetry
- The time that the worker will 'sleep' before trying to reconnect. -
maxErroneousPeriod
- The maximum time in which the worker is allowed to be in erroneous state. After that time passes, the worker will stop trying to reconnect
Workers interplay
SubscriptionConnectionOptions
Strategy
field.The strategy field is an enum, having the following values:
OpenIfFree
- the server will allow the worker to connect only if there isn't any other currently connected workers.
If there is a existing connection, the incoming worker will throw a SubscriptionInUseException.WaitForFree
- If the client currently cannot open the subscription because it is used by another client, it will wait for the previous client to disconnect and only then will connect.
This is useful in client failover scenarios where there is one active client and another one already waiting to take its place.-
TakeOver
- the server will allow an incoming connection to overthrow an existing one. It will behave according to the existing connection strategy:- The existing connection has a strategy that is not
TakeOver
. In this case, the incoming connection will take over it causing the existing connection to throw a SubscriptionInUseException exception. - The existing connection has a strategy that is
TakeOver
. In this case, the incoming connection will throw a SubscriptionInUseException exception.
- The existing connection has a strategy that is not