Data Subscriptions: How to Consume a Data Subscription



SubscriptionWorker lifecycle

A SubscriptionWorker object starts its life from being generated by the DocumentsStore.Subscriptions:

subscriptionWorker = store.Subscriptions.GetSubscriptionWorker<Order>(subscriptionName);

At this point, the worker has only got its configuration. No connection or processing happens at this moment. To start processing, the Run method should be called. The Run method receives the batch processing logic that should be performed:

subscriptionRuntimeTask = subscriptionWorker.Run(batch =>
{
    // your logic here
});

From this point on, the subscription worker will start processing batches.
If processing is aborted for any reason, the returned task (subscriptionRuntimeTask) will be finished with an exception.

Error handling

There are two categories of errors that may occur during subscription processing:

Internal mechanism errors

Those errors occur during the normal server-client communication between the worker and the server.
If an unexpected error occurs, the worker will try to reconnect to the server. There are conditions in which the worker will cease its operation and will not try to reconnect:

  • The subscription does not exist or was deleted
  • Another worker took over the subscription (see connection strategy)
  • The worker could not connect to any of the servers
  • The worker could not receive the node in charge of the task (this can happen when there is no leader)
  • Authorization exception
  • Exception during connection establishment

User's batch processing logic unhandled exception

Example:

_ = workerWBatch.Run(x => throw new Exception());

If an exception is thrown, the worker will abort the current batch process. A worker can be configured to treat the thrown exception in either of the following two ways:

  • By default, the worker will wrap the thrown exception with a SubscriberErrorException exception and rethrow it,
    terminating the subscription execution without acknowledging progress or retrying. The task returned by the Run function will be terminated with an erroneous state, throwing a SubscriberErrorException exception.

  • If SubscriptionWorkerOptions's value IgnoreSubscriberErrors is set to true, the erroneous batch will get acknowledged without retrying and the next batches will continue processing.

Reconnecting

In the cases above, we described situations in which a worker will try to reconnect with the server. There are two key SubscriptionWorkerOptions fields controlling this state:

  • TimeToWaitBeforeConnectionRetry - The time that the worker will 'sleep' before trying to reconnect.
  • MaxErroneousPeriod - The maximum time in which the worker is allowed to be in an erroneous state. After that time passes, the worker will stop trying to reconnect.

Timing out

A worker will time out after losing its connectivity with the server for a given time period.

  • The timeout period can be set using the ConnectionStreamTimeout option.
    E.g.
    var options = new SubscriptionWorkerOptions(subscriptionName);
    
    // Set the worker's timeout period
    options.ConnectionStreamTimeout = TimeSpan.FromSeconds(45);
  • Default timeout period: 30 second

OnUnexpectedSubscriptionError

OnUnexpectedSubscriptionError is the event raised when a connection failure occurs between the subscription worker and the server and it throws an unexpected exception. When this occurs, the worker will automatically try to reconnect again. This event is useful for logging these unexpected exceptions.

Worker interplay

  • Subscription workers are configured with a strategy, that determines whether they can connect their subscription concurrently, or only one at a time.
  • If a one-at-a-time strategy is chosen, it also determines how the workers interact with each other to resolve which will connect the subscription.

The strategy is configured by the SubscriptionWorkerOptions Strategy field.
The strategy field is the enum SubscriptionOpeningStrategy.

SubscriptionOpeningStrategy
OpenIfFree Connect if no other worker is connected
WaitForFree Wait for currently connected worker to disconnect
TakeOver Take over the connection
Concurrent Connect concurrently

Available Worker Strategies


One Worker Per Subscription Strategies

The following three strategies allow only a single worker to connect the subscription at any given time, and determine what happens when one worker is connected and another tries to connect.

  • SubscriptionOpeningStrategy.OpenIfFree
    The server will allow the worker to connect only if there is currently no other connected worker.
    If there is an existing connection, the incoming worker will throw a SubscriptionInUseException.
  • SubscriptionOpeningStrategy.WaitForFree
    If the worker currently cannot open the subscription because it is used by another worker, it will wait for the previous worker to disconnect and connect only then.
    This is useful in worker failover scenarios, where there is one active worker and another already waits to take its place.
  • SubscriptionOpeningStrategy.TakeOver
    The server will allow an incoming connection to overthrow an existing one, according to the existing connection strategy.
    • If the existing connection does not have a TakeOver strategy:
      The incoming connection will take over, causing the existing connection to throw a SubscriptionInUseException.
    • If the existing connection has a TakeOver strategy:
      The incoming connection will throw a SubscriptionInUseException exception.

Concurrent Strategy

  • SubscriptionOpeningStrategy.Concurrent
    Multiple workers of the same subscription are allowed to connect it simultaneously.
    Read more about concurrent subscriptions here.

Determining Which Workers a Subscription Will Serve

The strategy used by the first worker connecting to a subscription will determine which additional workers this subscription will be able to serve until all worker connections are dropped.

  • A subscription that serves one or more Concurrent workers, can serve only other concurrent workers until all worker connections are dropped.
    If a worker that uses a One Worker Per Subscription strategy attempts to connect it -

    • The connection attempt will be rejected.
    • SubscriptionInUseException will be thrown.
  • A subscription that serves a worker that uses a One Worker Per Subscription strategy, cannot serve Concurrent workers until the worker's connection is dropped.
    If a concurrent worker attempts to connect it -

    • The connection attempt will be rejected.
    • SubscriptionInUseException will be thrown.