Consume Subscriptions API
Create the subscription worker
Subscription worker generation is accessible through the DocumentStore
's subscriptions()
method, of type DocumentSubscriptions
:
SubscriptionWorker<ObjectNode> getSubscriptionWorker(SubscriptionWorkerOptions options);
SubscriptionWorker<ObjectNode> getSubscriptionWorker(SubscriptionWorkerOptions options, String database);
SubscriptionWorker<ObjectNode> getSubscriptionWorker(String subscriptionName);
SubscriptionWorker<ObjectNode> getSubscriptionWorker(String subscriptionName, String database);
<T> SubscriptionWorker<T> getSubscriptionWorker(Class<T> clazz, SubscriptionWorkerOptions options);
<T> SubscriptionWorker<T> getSubscriptionWorker(Class<T> clazz, SubscriptionWorkerOptions options, String database);
<T> SubscriptionWorker<T> getSubscriptionWorker(Class<T> clazz, String subscriptionName);
<T> SubscriptionWorker<T> getSubscriptionWorker(Class<T> clazz, String subscriptionName, String database);
<T> SubscriptionWorker<Revision<T>> getSubscriptionWorkerForRevisions(Class<T> clazz, SubscriptionWorkerOptions options);
<T> SubscriptionWorker<Revision<T>> getSubscriptionWorkerForRevisions(Class<T> clazz, SubscriptionWorkerOptions options, String database);
<T> SubscriptionWorker<Revision<T>> getSubscriptionWorkerForRevisions(Class<T> clazz, String subscriptionName);
<T> SubscriptionWorker<Revision<T>> getSubscriptionWorkerForRevisions(Class<T> clazz, String subscriptionName, String database);
Parameter | ||
---|---|---|
subscriptionName | String |
The subscription's name. This parameter appears in more simple overloads allowing to start processing without creating a SubscriptionCreationOptions instance, relying on the default values |
options | SubscriptionWorkerOptions |
Options that affect how the worker interacts with the subscription. These options do not alter the definition of the subscription itself. |
database | String |
The name of the database where the subscription task resides. If null , the default database configured in DocumentStore will be used. |
Return value | |
---|---|
SubscriptionWorker |
The subscription worker that has been created. Initially, it is idle and will only start processing documents when the run function is called. |
SubscriptionWorkerOptions
When creating a worker with SubscriptionWorkerOptions
, the only mandatory property is subscriptionName
.
All other parameters are optional and will default to their respective default values if not specified.
Member | Type | Description |
---|---|---|
subscriptionName | String |
The name of the subscription to which the worker will connect. |
timeToWaitBeforeConnectionRetry | Duration |
The time to wait before attempting to reconnect after a non-aborting failure during subscription processing. Default: 5 seconds. |
ignoreSubscriberErrors | boolean |
Determines if subscription processing is aborted when the worker's batch-handling code throws an unhandled exception.true – subscription processing will continue.false (Default) – subscription processing will be aborted. |
strategy | SubscriptionOpeningStrategy (enum) |
Configures how the server handles connection attempts from workers to a specific subscription task. Learn more in worker strategies. Default: OPEN_IF_FREE . |
maxDocsPerBatch | int |
The maximum number of documents that the server will try to retrieve and send to the client in a batch. If the server doesn't find as many documents as specified, it will send the documents it has found without waiting. Default: 4096. |
closeWhenNoDocsLeft | boolean |
Determines whether the subscription connection closes when no new documents are available.true – The subscription worker processes all available documents and stops when none remain, at which point the run method throws a SubscriptionClosedException .Useful for ad-hoc, one-time processing. false (Default) – The subscription worker remains active, waiting for new documents. |
sendBufferSizeInBytes | int |
The size in bytes of the TCP socket buffer used for sending data. Default: 32,768 bytes (32 KiB). |
receiveBufferSizeInBytes | int |
The size in bytes of the TCP socket buffer used for receiving data. Default: 4096 (4 KiB). |
Run the subscription worker
After creating a subscription worker, the subscription worker is still not processing any documents.
To start processing, you need to call the run
method of the SubscriptionWorker.
The run
function takes a delegate, which is your client-side code responsible for processing the received document batches.
CompletableFuture<Void> run(Consumer<SubscriptionBatch<T>> processDocuments);
Parameter | ||
---|---|---|
processDocuments | Consumer<SubscriptionBatch<T>> |
Delegate for sync batches processing |
Return value | ||
---|---|---|
CompletableFuture<Void> |
Task that is alive as long as the subscription worker is processing or tries processing. If the processing is aborted, the future exits with an exception |
SubscriptionBatch<T>
Member | Type | Description |
---|---|---|
items | List<SubscriptionBatch<T>.Item> |
List of items in the batch. |
numberOfItemsInBatch | int |
Number of items in the batch. |
Method Signature | Return value | Description |
---|---|---|
openSession() | IDocumentSession |
New document session, that tracks all items and included items of the current batch. |
Subscription worker connectivity
As long as there is no exception, the worker will continue addressing the same
server that the first batch was received from.
If the worker fails to reach that node, it will try to failover to another node
from the session's topology list.
The node that the worker succeeded connecting to, will inform the worker which
node is currently responsible for data subscriptions.
SubscriptionBatch<T>.Item
Note
if T is ObjectNode
, no deserialization will take place
Member | Type | Description |
---|---|---|
result | T |
Current batch item. |
exceptionMessage | String |
Message of the exception thrown during current document processing in the server side. |
id | String |
Current batch item's underlying document ID. |
changeVector | String |
Current batch item's underlying document change vector of the current document. |
rawResult | ObjectNode |
Current batch item before serialization to T . |
rawMetadata | ObjectNode |
Current batch item's underlying document metadata. |
metadata | IMetadataDictionary |
Current batch item's underlying metadata values. |
SubscriptionWorker<T>
Methods
Method Signature | Return Type | Description |
---|---|---|
close() | void |
Aborts subscription worker operation ungracefully by waiting for the task returned by the run function to finish running. |
run (multiple overloads) | CompletableFuture<Void> |
Call run to begin the worker's batch processing.Pass the batch processing delegates to this method (see above). |
Events
Event | Type\Return type | Description |
---|---|---|
addAfterAcknowledgmentListener | Consumer<SubscriptionBatch<T>> (event) |
Event that is risen after each the server acknowledges batch processing progress. |
onSubscriptionConnectionRetry | Consumer<Exception> (event) |
Event that is fired when the subscription worker tries to reconnect to the server after a failure. The event receives as a parameter the exception that interrupted the processing. |
onClosed | Consumer<SubscriptionWorker<T>> (event) |
Event that is fired after the subscription worker was disposed. |
Properties
Member | Type\Return type | Description |
---|---|---|
currentNodeTag | String |
The node tag of the current RavenDB server handling the subscription. |
subscriptionName | String |
The name of the currently processed subscription. |