How to Consume a Data Subscription
Subscriptions are consumed by processing batches of documents received from the server.
A SubscriptionWorker
object manages the documents processing and the communication between the client and the server according to a set of configurations received upon it's creation.
We've introduced several ways to create and configure a SubscriptionWorker, starting from just giving a subscription name, and ending with a detailed configuration object - SubscriptionWorkerOptions
.
- In this page:
SubscriptionWorker lifecycle
A SubscriptionWorker
object starts its life from being generated by the DocumentsStore.subscriptions
:
subscriptionWorker = store.subscriptions().getSubscriptionWorker(Order.class, subscriptionName);
At this point, the worker has only got its configuration. No connection or processing happens at this moment.
In order to start processing, the run
method should be called. The run
method receives the batch processing logic that should be performed:
subscriptionRuntimeTask = subscriptionWorker.run(batch -> {
// your logic here
});
From this point on, the subscription worker will start processing batches. If for any reason, the processing is aborted, the returned task (subscriptionRuntimeTask
) will complete with an exception.
Error handling
Subscription worker connection failures
Subscription worker connection failures may occur during the routine communication between the worker and the server. When an unexpected error arises, the worker will attempt to reconnect to the server.
However, there are several conditions under which the worker will stop its operation but will Not attempt to reconnect:
- The subscription no longer exists or has been deleted.
- Another worker has taken control of the subscription (see connection strategy).
- The worker is unable to connect to any of the servers.
- The worker could not receive the node responsible for the task
(this can happen when there is no leader in the cluster). - An authorization exception occurred.
- An exception occurred during the connection establishment phase.
- The database doesn't exist.
Batch processing execution failures
An exception may occur while processing a batch of documents in the worker. For example:
workerWBatch.run(x -> {
throw new RuntimeException();
});
When creating a worker, the worker can be configured to handle these exceptions in either of the following ways,
depending on the IgnoreSubscriberErrors
property in SubscriptionWorkerOptions:
-
Abort processing completely
WhenIgnoreSubscriberErrors
is set to false (default):
The current batch processing will be aborted, and in this case, the worker will wrap the thrown exception in aSubscriberErrorException
and will rethrow it. Processing of the subscription will be terminated without acknowledging progress to the server or retrying to connect.
As a result, the task returned by theRun
function will complete in an erroneous state, throwing a SubscriberErrorException. -
Continue processing subsequent batches
WhenIgnoreSubscriberErrors
is set to true:
The current batch processing will be aborted; however, the erroneous batch will be acknowledged without retrying, and processing will continue with the next batches.
Reconnecting
Two properties in the SubscriptionWorkerOptions object control the behavior of a worker attempting to reconnect with the server:
timeToWaitBeforeConnectionRetry
The time the worker will wait before attempting to reconnect.
Default: 5 seconds.maxErroneousPeriod
The maximum amount of time the subscription connection can remain in an erroneous state.
Once this period is exceeded, the worker will stop trying to reconnect.
Default: 5 minutes.
Worker strategies
There can only be one active subscription worker working on a subscription.
Nevertheless, there are scenarios where it is required to interact between an existing subscription worker and one that tries to connect.
This relationship and interoperation is configured by the SubscriptionConnectionOptions
Strategy
field.
The strategy field is an enum, having the following values:
OPEN_IF_FREE
- the server will allow the worker to connect only if there isn't any other currently connected workers.
If there is a existing connection, the incoming worker will throw a SubscriptionInUseException.WAIT_FOR_FREE
- If the client currently cannot open the subscription because it is used by another client, it will wait for the previous client to disconnect and only then will connect.
This is useful in client failover scenarios where there is one active client and another one already waiting to take its place.-
TAKE_OVER
- the server will allow an incoming connection to overthrow an existing one. It will behave according to the existing connection strategy:- The existing connection has a strategy that is not
TAKE_OVER
. In this case, the incoming connection will take over it causing the existing connection to throw a SubscriptionInUseException exception. - The existing connection has a strategy that is
TAKE_OVER
. In this case, the incoming connection will throw a SubscriptionInUseException exception.
- The existing connection has a strategy that is not