Data Subscriptions: Consumption API Overview
In this page:
Subscription worker generation
SubscriptionWorkerOptions
Running subscription worker
SubscriptionBatch<T>
SubscriptionWorker<T>
Subscription worker generation
Subscription worker generation is accessible through the DocumentStore
's subscriptions()
method, of type DocumentSubscriptions
:
SubscriptionWorker<ObjectNode> getSubscriptionWorker(SubscriptionWorkerOptions options);
SubscriptionWorker<ObjectNode> getSubscriptionWorker(SubscriptionWorkerOptions options, String database);
SubscriptionWorker<ObjectNode> getSubscriptionWorker(String subscriptionName);
SubscriptionWorker<ObjectNode> getSubscriptionWorker(String subscriptionName, String database);
<T> SubscriptionWorker<T> getSubscriptionWorker(Class<T> clazz, SubscriptionWorkerOptions options);
<T> SubscriptionWorker<T> getSubscriptionWorker(Class<T> clazz, SubscriptionWorkerOptions options, String database);
<T> SubscriptionWorker<T> getSubscriptionWorker(Class<T> clazz, String subscriptionName);
<T> SubscriptionWorker<T> getSubscriptionWorker(Class<T> clazz, String subscriptionName, String database);
<T> SubscriptionWorker<Revision<T>> getSubscriptionWorkerForRevisions(Class<T> clazz, SubscriptionWorkerOptions options);
<T> SubscriptionWorker<Revision<T>> getSubscriptionWorkerForRevisions(Class<T> clazz, SubscriptionWorkerOptions options, String database);
<T> SubscriptionWorker<Revision<T>> getSubscriptionWorkerForRevisions(Class<T> clazz, String subscriptionName);
<T> SubscriptionWorker<Revision<T>> getSubscriptionWorkerForRevisions(Class<T> clazz, String subscriptionName, String database);
Parameters | ||
---|---|---|
subscriptionName | String |
The subscription's name. This parameter appears in more simple overloads allowing to start processing without creating a SubscriptionCreationOptions instance, relying on the default values |
options | SubscriptionWorkerOptions |
Contains subscription worker, affecting the interaction of the specific worker with the subscription, but does not affect the subscription's definition |
database | String |
Name of the database to look for the data subscription. If null , the default database configured in DocumentStore will be used. |
Return value | |
---|---|
SubscriptionWorker |
A created data subscription worker. When returned, the worker is Idle and it will start working only when the run function is called. |
SubscriptionWorkerOptions
Note
The only mandatory parameter for SubscriptionWorkerOptions creation is the subscription's name.
Member | Type | Description |
---|---|---|
subscriptionName | String |
Returns the subscription name passed to the constructor. This name will be used by the server side to identify the subscription in question. |
timeToWaitBeforeConnectionRetry | Duration |
Time to wait before reconnecting, in the case of non-aborting failure during the subscription processing. Default: 5 seconds. |
ignoreSubscriberErrors | boolean |
If true, will not abort subscription processing if client code, passed to the run function, throws an unhandled exception. Default: false. |
strategy | SubscriptionOpeningStrategy (enum) |
Sets the way the server will treat current and/or other clients when they will try to connect. See Workers interplay. Default: OPEN_IF_FREE . |
maxDocsPerBatch | int |
Maximum amount of documents that the server will try sending in a batch. If the server will not find "enough" documents, it won't wait and send the amount it found. Default: 4096. |
closeWhenNoDocsLeft | boolean |
If true, it performs an "ad-hoc" operation that processes all possible documents, until the server can't find any new documents to send. At that moment, the task returned by the Run function will fail and throw a SubscriptionClosedException exception. Default: false. |
sendBufferSizeInBytes | int |
The size in bytes of the TCP socket buffer used for sending data. Default: 32,768 (32 KiB) |
receiveBufferSizeInBytes | int |
The size in bytes of the TCP socket buffer used for receiving data. Default: 32,768 (32 KiB) |
Running subscription worker
After receiving a subscription worker, the subscription worker is still not processing any documents. SubscriptionWorker's run
function allows you to start processing worker operations.
The run
function receives the client-side code as a consumer that will process the received batches:
CompletableFuture<Void> run(Consumer<SubscriptionBatch<T>> processDocuments);
Parameters | ||
---|---|---|
processDocuments | Consumer<SubscriptionBatch<T>> |
Delegate for sync batches processing |
Return value | ||
---|---|---|
CompletableFuture<Void> |
Task that is alive as long as the subscription worker is processing or tries processing. If the processing is aborted, the future exits with an exception |
SubscriptionBatch<T>
Member | Type | Description |
---|---|---|
items | List<SubscriptionBatch<T>.Item> |
Batch's items list. |
numberOfItemsInBatch | int |
Amount of items in the batch. |
Method Signature | Return value | Description |
---|---|---|
openSession() | IDocumentSession |
New document session, that tracks all items and included items of the current batch. |
Subscription Session characteristics
Session will be created by the same document store that created the worker, therefore will receive the same configurations as any other session created by the store.
However, in order to maintain consistency, the session will address the same server that the batch was received from.
It won't try to fail over to another server. It might also fail if the subscription worker changes the node it communicates with.
Such event could happen if the subscription worker starts again to address its original node after a fallback occurrence.
If such failure occurs, the subscription processing will be stopped, and will have to be restarted, as shown here
SubscriptionBatch<T>.Item
Note
if T is ObjectNode
, no deserialization will take place
Member | Type | Description |
---|---|---|
result | T |
Current batch item. |
exceptionMessage | String |
Message of the exception thrown during current document processing in the server side. |
id | String |
Current batch item's underlying document ID. |
changeVector | String |
Current batch item's underlying document change vector of the current document. |
rawResult | ObjectNode |
Current batch item before serialization to T . |
rawMetadata | ObjectNode |
Current batch item's underlying document metadata. |
metadata | IMetadataDictionary |
Current batch item's underlying metadata values. |
SubscriptionWorker<T>
Methods
Method Signature | Return Type | Description |
---|---|---|
close() | void |
Aborts subscription worker operation ungracefully by waiting for the task returned by the run function to finish running. |
run (multiple overloads) | CompletableFuture<Void> |
Starts the subscription worker work of processing batches, receiving the batch processing delegates (see above). |
Events
Event | Type\Return type | Description |
---|---|---|
addAfterAcknowledgmentListener | Consumer<SubscriptionBatch<T>> (event) |
Event that is risen after each the server acknowledges batch processing progress. |
onSubscriptionConnectionRetry | Consumer<Exception> (event) |
Event that is fired when the subscription worker tries to reconnect to the server after a failure. The event receives as a parameter the exception that interrupted the processing. |
onClosed | Consumer<SubscriptionWorker<T>> (event) |
Event that is fired after the subscription worker was disposed. |
Properties
Member | Type\Return type | Description |
---|---|---|
currentNodeTag | String |
Returns current processing RavenDB server's node tag. |
subscriptionName | String |
Returns processed subscription's name. |