Data Subscriptions: Consumption API Overview



Subscription worker generation

Subscription worker generation is accessible through the DocumentStore's subscriptions() method, of type DocumentSubscriptions:

SubscriptionWorker<ObjectNode> getSubscriptionWorker(SubscriptionWorkerOptions options);
SubscriptionWorker<ObjectNode> getSubscriptionWorker(SubscriptionWorkerOptions options, String database);

SubscriptionWorker<ObjectNode> getSubscriptionWorker(String subscriptionName);
SubscriptionWorker<ObjectNode> getSubscriptionWorker(String subscriptionName, String database);

<T> SubscriptionWorker<T> getSubscriptionWorker(Class<T> clazz, SubscriptionWorkerOptions options);
<T> SubscriptionWorker<T> getSubscriptionWorker(Class<T> clazz, SubscriptionWorkerOptions options, String database);

<T> SubscriptionWorker<T> getSubscriptionWorker(Class<T> clazz, String subscriptionName);
<T> SubscriptionWorker<T> getSubscriptionWorker(Class<T> clazz, String subscriptionName, String database);

<T> SubscriptionWorker<Revision<T>> getSubscriptionWorkerForRevisions(Class<T> clazz, SubscriptionWorkerOptions options);
<T> SubscriptionWorker<Revision<T>> getSubscriptionWorkerForRevisions(Class<T> clazz, SubscriptionWorkerOptions options, String database);

<T> SubscriptionWorker<Revision<T>> getSubscriptionWorkerForRevisions(Class<T> clazz, String subscriptionName);
<T> SubscriptionWorker<Revision<T>> getSubscriptionWorkerForRevisions(Class<T> clazz, String subscriptionName, String database);
Parameters
subscriptionName String The subscription's name. This parameter appears in more simple overloads allowing to start processing without creating a SubscriptionCreationOptions instance, relying on the default values
options SubscriptionWorkerOptions Contains subscription worker, affecting the interaction of the specific worker with the subscription, but does not affect the subscription's definition
database String Name of the database to look for the data subscription. If null, the default database configured in DocumentStore will be used.
Return value
SubscriptionWorker A created data subscription worker. When returned, the worker is Idle and it will start working only when the run function is called.

SubscriptionWorkerOptions

Note

The only mandatory parameter for SubscriptionWorkerOptions creation is the subscription's name.

Member Type Description
subscriptionName String Returns the subscription name passed to the constructor. This name will be used by the server side to identify the subscription in question.
timeToWaitBeforeConnectionRetry Duration Time to wait before reconnecting, in the case of non-aborting failure during the subscription processing. Default: 5 seconds.
ignoreSubscriberErrors boolean If true, will not abort subscription processing if client code, passed to the run function, throws an unhandled exception. Default: false.
strategy SubscriptionOpeningStrategy
(enum)
Sets the way the server will treat current and/or other clients when they will try to connect. See Workers interplay. Default: OPEN_IF_FREE.
maxDocsPerBatch int Maximum amount of documents that the server will try sending in a batch. If the server will not find "enough" documents, it won't wait and send the amount it found. Default: 4096.
closeWhenNoDocsLeft boolean If true, it performs an "ad-hoc" operation that processes all possible documents, until the server can't find any new documents to send. At that moment, the task returned by the Run function will fail and throw a SubscriptionClosedException exception. Default: false.
sendBufferSizeInBytes int The size in bytes of the TCP socket buffer used for sending data.
Default: 32,768 (32 KiB)
receiveBufferSizeInBytes int The size in bytes of the TCP socket buffer used for receiving data.
Default: 32,768 (32 KiB)

Running subscription worker

After receiving a subscription worker, the subscription worker is still not processing any documents. SubscriptionWorker's run function allows you to start processing worker operations.
The run function receives the client-side code as a consumer that will process the received batches:

CompletableFuture<Void> run(Consumer<SubscriptionBatch<T>> processDocuments);
Parameters
processDocuments Consumer<SubscriptionBatch<T>> Delegate for sync batches processing
Return value
CompletableFuture<Void> Task that is alive as long as the subscription worker is processing or tries processing. If the processing is aborted, the future exits with an exception

SubscriptionBatch<T>

Member Type Description
items List<SubscriptionBatch<T>.Item> Batch's items list.
numberOfItemsInBatch int Amount of items in the batch.
Method Signature Return value Description
openSession() IDocumentSession New document session, that tracks all items and included items of the current batch.

Subscription Worker Connectivity

As long as there is no exception, the worker will continue addressing the same server that the first batch was received from.
If the worker fails to reach that node, it will try to failover to another node from the session's topology list.
The node that the worker succeeded connecting to, will inform the worker which node is currently responsible for data subscriptions.

SubscriptionBatch<T>.Item

Note

if T is ObjectNode, no deserialization will take place

Member Type Description
result T Current batch item.
exceptionMessage String Message of the exception thrown during current document processing in the server side.
id String Current batch item's underlying document ID.
changeVector String Current batch item's underlying document change vector of the current document.
rawResult ObjectNode Current batch item before serialization to T.
rawMetadata ObjectNode Current batch item's underlying document metadata.
metadata IMetadataDictionary Current batch item's underlying metadata values.

SubscriptionWorker<T>

Methods

Method Signature Return Type Description
close() void Aborts subscription worker operation ungracefully by waiting for the task returned by the run function to finish running.
run (multiple overloads) CompletableFuture<Void> Starts the subscription worker work of processing batches, receiving the batch processing delegates (see above).

Events

Event Type\Return type Description
addAfterAcknowledgmentListener Consumer<SubscriptionBatch<T>> (event) Event that is risen after each the server acknowledges batch processing progress.
onSubscriptionConnectionRetry Consumer<Exception> (event) Event that is fired when the subscription worker tries to reconnect to the server after a failure. The event receives as a parameter the exception that interrupted the processing.
onClosed Consumer<SubscriptionWorker<T>> (event) Event that is fired after the subscription worker was disposed.

Properties

Member Type\Return type Description
currentNodeTag String Returns current processing RavenDB server's node tag.
subscriptionName String Returns processed subscription's name.