How to open data subscription?
Having a subscription identifier allows you to open it. Together with id you need to specify some details about the subscription connection:
Subscription<RavenJObject> Open(long id, SubscriptionConnectionOptions options, string database = null);
Subscription<T> Open<T>(long id, SubscriptionConnectionOptions options, string database = null)
Parameters | ||
---|---|---|
id | long | A data subscription identifier. |
options | SubscriptionConnectionOptions | Connection options. |
Return value | |
---|---|
Subscription<RavenJObject> / Subscription<T> | Subscription instance. |
We have two method to open subscription. The first one is to deal with documents belonging to different collections - results are returned as RavenJObject
objects then. The second one returns strongly
typed subscription where retrieved documents are converted to a given type.
Single subscription consumer at a time allowed
There can be only a single open subscription connection per subscription. An attempt to open already being opened subscription will result in throwing an exception.
Documents are sent to a client in batches. SubscriptionConnectionOptions
has BatchOptions
property where you can specify:
- MaxDocCount - max number of docs that can be sent in a single batch (default: 4096),
- MaxSize - max total batch size in bytes (default: null - no size limit),
- AcknowledgmentTimeout - max time within the subscription needs to confirm that the batch has been successfully processed (default: 1 minute).
Additionally connection options have the following settings:
- IgnoreSubscribersErrors - determines if subscription should ignore errors thrown by subscription handlers (default: false),
- ClientAliveNotificationInterval - specifies how often the subscription sends heart beats to the server (server keeps the subscription open until a connected client sends these alive notifications - two undelivered notifications would let an another client to connect, default: 2 minutes),
-
Strategy - the enum that represents subscription opening strategy. There are four strategies available:
OpenIfFree
- the client will successfully open a subscription only if there isn't any other currently connected client. Otherwise it will end up withSubscriptionInUseException
,TakeOver
- the connecting client will successfully open a subscription even if there is another active subscription's consumer. If the new client takes over the subscription then the existing one will get rejected. The subscription will always be processed by the last connected client.ForceAndKeep
- the client opening a subscription with forced strategy set will always get it and keep it open until another client with the same strategy gets connected.WaitForFree
- if the client currently cannot open the subscription because it is used by another client then it will subscribe Changes API to be notified about subscription status changes. Every timeSubscriptionReleased
notification arrives, it will repeat an attempt to open the subscription. After it succeeds in opening, it will process docs as usual.
Error handling
By default the data subscription does not allow processing errors (IgnoreSubscribersErrors: false
). So if any subscription handler fails,
then it will stop pulling documents and close the subscription connection immediately. If you set IgnoreSubscribersErrors
to true
, it will ignore an error raised by a handler and
keep retrieving next docs.
Acknowledgment timeout handling
Under the scenes, once you have successfully processed a batch, the notification will send a confirmation to the server about it. The server keeps track of the last processed and
acknowledged document. If subscription handlers don't process the batch within the specified AcknowledgmentTimeout
, then the server will resend the whole batch again. You will get
the same documents over and over until you successfully processed it.
Crashing handling
Tracking the last acknowledged Etag allows the data subscription to handle crashing scenarios. If there is a crash, we know what documents have been already processed. If you crashed midway, the database will just resend you the relevant documents when you open the subscription again. The data subscription automatically retries to open the subscription connection every 15 seconds if it get lost.
Example I
var orders = store.Subscriptions.Open<Order>(id, new SubscriptionConnectionOptions()
{
BatchOptions = new SubscriptionBatchOptions()
{
MaxDocCount = 16*1024,
MaxSize = 4*1024*1024,
AcknowledgmentTimeout = TimeSpan.FromMinutes(3)
},
IgnoreSubscribersErrors = false,
ClientAliveNotificationInterval = TimeSpan.FromSeconds(30)
});
Processing documents
The result of opening subscription is Subscription<T>
or Subscription<RavenJObject>
instance. It implements IObservable
interface, so it means that you can just utilize Reactive Extensions
and subscribe to the incoming documents in order to process them. Also you will continue to get them, even for items that were added after you opened the subscription, because under the hood the Changes API
is used to get notifications about any document updates.
Example II
orders.Subscribe(x =>
{
GenerateInvoice(x);
});
orders.Subscribe(x =>
{
if(x.RequireAt > DateTime.Now)
SendReminder(x.Employee, x.Id);
});
Example III
You may want to dynamically manage subscription handlers. The returned subscriber object is type of IDisposable
, in order to detach it from subscription just call Dispose
on it:
var subscriber = orders.Subscribe(x => { });
subscriber.Dispose();
No subscriber attached
The data subscription stops pulling docs if there is no subscriber attached.