see on GitHub

How to open data subscription?

Having a subscription identifier allows you to open it. Together with id you need to specify some details about the subscription connection:

Subscription<RavenJObject> Open(long id, SubscriptionConnectionOptions options, string database = null);

Subscription<T> Open<T>(long id, SubscriptionConnectionOptions options, string database = null) 

Parameters
id long A data subscription identifier.
options SubscriptionConnectionOptions Connection options.
Return value
Subscription<RavenJObject> / Subscription<T> Subscription instance.

We have two method to open subscription. The first one is to deal with documents belonging to different collections - results are returned as RavenJObject objects then. The second one returns strongly typed subscription where retrieved documents are converted to a given type.

Single subscription consumer at a time allowed

There can be only a single open subscription connection per subscription. An attempt to open already being opened subscription will result in throwing an exception.

Documents are sent to a client in batches. SubscriptionConnectionOptions has BatchOptions property where you can specify:

  • MaxDocCount - max number of docs that can be sent in a single batch (default: 4096),
  • MaxSize - max total batch size in bytes (default: null - no size limit),
  • AcknowledgmentTimeout - max time within the subscription needs to confirm that the batch has been successfully processed (default: 1 minute).

Additionally connection options have the following settings:

  • IgnoreSubscribersErrors - determines if subscription should ignore errors thrown by subscription handlers (default: false),
  • ClientAliveNotificationInterval - specifies how often the subscription sends heart beats to the server (server keeps the subscription open until a connected client sends these alive notifications - two undelivered notifications would let an another client to connect, default: 2 minutes),
  • Strategy - the enum that represents subscription opening strategy. There are four strategies available:

    • OpenIfFree - the client will successfully open a subscription only if there isn't any other currently connected client. Otherwise it will end up with SubscriptionInUseException,
    • TakeOver - the connecting client will successfully open a subscription even if there is another active subscription's consumer. If the new client takes over the subscription then the existing one will get rejected. The subscription will always be processed by the last connected client.
    • ForceAndKeep - the client opening a subscription with forced strategy set will always get it and keep it open until another client with the same strategy gets connected.
    • WaitForFree - if the client currently cannot open the subscription because it is used by another client then it will subscribe Changes API to be notified about subscription status changes. Every time SubscriptionReleased notification arrives, it will repeat an attempt to open the subscription. After it succeeds in opening, it will process docs as usual.

Error handling

By default the data subscription does not allow processing errors (IgnoreSubscribersErrors: false). So if any subscription handler fails, then it will stop pulling documents and close the subscription connection immediately. If you set IgnoreSubscribersErrors to true, it will ignore an error raised by a handler and keep retrieving next docs.

Acknowledgment timeout handling

Under the scenes, once you have successfully processed a batch, the notification will send a confirmation to the server about it. The server keeps track of the last processed and acknowledged document. If subscription handlers don't process the batch within the specified AcknowledgmentTimeout, then the server will resend the whole batch again. You will get the same documents over and over until you successfully processed it.

Crashing handling

Tracking the last acknowledged Etag allows the data subscription to handle crashing scenarios. If there is a crash, we know what documents have been already processed. If you crashed midway, the database will just resend you the relevant documents when you open the subscription again. The data subscription automatically retries to open the subscription connection every 15 seconds if it get lost.

Example I

var orders = store.Subscriptions.Open<Order>(id, new SubscriptionConnectionOptions()
{
	BatchOptions = new SubscriptionBatchOptions()
	{
		MaxDocCount = 16*1024,
		MaxSize = 4*1024*1024,
		AcknowledgmentTimeout = TimeSpan.FromMinutes(3)
	},
	IgnoreSubscribersErrors = false,
	ClientAliveNotificationInterval = TimeSpan.FromSeconds(30)
});

Processing documents

The result of opening subscription is Subscription<T> or Subscription<RavenJObject> instance. It implements IObservable interface, so it means that you can just utilize Reactive Extensions and subscribe to the incoming documents in order to process them. Also you will continue to get them, even for items that were added after you opened the subscription, because under the hood the Changes API is used to get notifications about any document updates.

Example II

orders.Subscribe(x =>
{
	GenerateInvoice(x);
});

orders.Subscribe(x =>
{
	if(x.RequireAt > DateTime.Now)
		SendReminder(x.Employee, x.Id);
});

Example III

You may want to dynamically manage subscription handlers. The returned subscriber object is type of IDisposable, in order to detach it from subscription just call Dispose on it:

var subscriber = orders.Subscribe(x => { });

subscriber.Dispose();

No subscriber attached

The data subscription stops pulling docs if there is no subscriber attached.