Consume Subscriptions API
Create the subscription worker
A subscription worker can be created using the following GetSubscriptionWorker
methods available through the Subscriptions
property of the DocumentStore
.
Note: Simply creating the worker is insufficient;
after creating the worker, you need to run the subscription worker to initiate document processing.
SubscriptionWorker<dynamic> GetSubscriptionWorker(
string subscriptionName, string database = null);
SubscriptionWorker<dynamic> GetSubscriptionWorker(
SubscriptionWorkerOptions options, string database = null);
SubscriptionWorker<T> GetSubscriptionWorker<T>(
string subscriptionName, string database = null) where T : class;
SubscriptionWorker<T> GetSubscriptionWorker<T>(
SubscriptionWorkerOptions options, string database = null) where T : class;
Parameter | Type | Description |
---|---|---|
subscriptionName | string |
The name of the subscription to which the worker will connect. |
options | SubscriptionWorkerOptions |
Options that affect how the worker interacts with the subscription. These options do not alter the definition of the subscription itself. |
database | string |
The name of the database where the subscription task resides. If null , the default database configured in DocumentStore will be used. |
Return value | |
---|---|
SubscriptionWorker |
The subscription worker that has been created. Initially, it is idle and will only start processing documents when the Run function is called. |
SubscriptionWorkerOptions
public class SubscriptionWorkerOptions
{
public string SubscriptionName { get; }
public int MaxDocsPerBatch { get; set; }
public int SendBufferSizeInBytes { get; set; }
public int ReceiveBufferSizeInBytes { get; set; }
public bool IgnoreSubscriberErrors { get; set; }
public bool CloseWhenNoDocsLeft { get; set; }
public TimeSpan TimeToWaitBeforeConnectionRetry { get; set; }
public TimeSpan ConnectionStreamTimeout { get; set; }
public TimeSpan MaxErroneousPeriod { get; set; }
public SubscriptionOpeningStrategy Strategy { get; set; }
}
When creating a worker with SubscriptionWorkerOptions
, the only mandatory property is SubscriptionName
.
All other parameters are optional and will default to their respective default values if not specified.
Member | Type | Description |
---|---|---|
SubscriptionName | string |
The name of the subscription to which the worker will connect. |
MaxDocsPerBatch | int |
The maximum number of documents that the server will try to retrieve and send to the client in a batch. If the server doesn't find as many documents as specified, it will send the documents it has found without waiting. Default: 4096. |
SendBufferSizeInBytes | int |
The size in bytes of the TCP socket buffer used for sending data. Default: 32,768 bytes (32 KiB). |
ReceiveBufferSizeInBytes | int |
The size in bytes of the TCP socket buffer used for receiving data. Default: 4096 (4 KiB). |
IgnoreSubscriberErrors | bool |
Determines if subscription processing is aborted when the worker's batch-handling code throws an unhandled exception.true – subscription processing will continue.false (Default) – subscription processing will be aborted. |
CloseWhenNoDocsLeft | bool |
Determines whether the subscription connection closes when no new documents are available.true – The subscription worker processes all available documents and stops when none remain, at which point the Run method throws a SubscriptionClosedException .Useful for ad-hoc, one-time processing. false (Default) – The subscription worker remains active, waiting for new documents. |
TimeToWaitBeforeConnectionRetry | TimeSpan |
The time to wait before attempting to reconnect after a non-aborting failure during subscription processing. Default: 5 seconds. |
MaxErroneousPeriod | TimeSpan |
The maximum amount of time a subscription connection can remain in an erroneous state before it is terminated. Default: 5 minutes. |
Strategy | SubscriptionOpeningStrategy |
This enum configures how the server handles connection attempts from workers to a specific subscription task. Default: OpenIfFree . |
Learn more about SubscriptionOpeningStrategy
in worker strategies.
public enum SubscriptionOpeningStrategy
{
// Connect if no other worker is connected
OpenIfFree,
// Take over the connection
TakeOver,
// Wait for currently connected worker to disconnect
WaitForFree,
// Connect concurrently
Concurrent
}
Run the subscription worker
After creating a subscription worker, the subscription worker is still not processing any documents.
To start processing, you need to call the Run
method of the SubscriptionWorker.
The Run
function takes a delegate, which is your client-side code responsible for processing the received document batches.
Task Run(Action<SubscriptionBatch<T>> processDocuments,
CancellationToken ct = default(CancellationToken));
Task Run(Func<SubscriptionBatch<T>, Task> processDocuments,
CancellationToken ct = default(CancellationToken));
Parameter | Type | Description |
---|---|---|
processDocuments | Action<SubscriptionBatch<T>> |
Delegate for sync batches processing. |
processDocuments | Func<SubscriptionBatch<T>, Task> |
Delegate for async batches processing. |
ct | CancellationToken |
Cancellation token used in order to halt the worker operation. |
Return value | |
---|---|
Task |
Task that is alive as long as the subscription worker is processing or tries processing. If the processing is aborted, the task exits with an exception. |
SubscriptionBatch<T>
Member | Type | Description |
---|---|---|
Items | List<SubscriptionBatch<T>.Item> |
List of items in the batch. See SubscriptionBatch<T>.Item below. |
NumberOfItemsInBatch | int |
Number of items in the batch. |
Method Signature | Return value | Description |
---|---|---|
OpenSession() | IDocumentSession |
Open a new document session that tracks all items and their included items within the current batch. |
OpenAsyncSession() | IAsyncDocumentSession |
Open a new asynchronous document session that tracks all items and their included items within the current batch. |
Subscription worker connectivity
As long as there is no exception, the worker will continue addressing the same server that the first batch was received from.
If the worker fails to reach that node, it will try to failover to another node from the session's topology list.
The node that the worker succeeded connecting to, will inform the worker which node is currently responsible for data subscriptions.
SubscriptionBatch<T>.Item
This class represents a single item in a subscription batch results.
public struct Item
{
public T Result { get; internal set; }
public string ExceptionMessage { get; internal set; }
public string Id { get; internal set; }
public string ChangeVector { get; internal set; }
public bool Projection { get; internal set; }
public bool Revision { get; internal set; }
public BlittableJsonReaderObject RawResult { get; internal set; }
public BlittableJsonReaderObject RawMetadata { get; internal set; }
public IMetadataDictionary Metadata { get; internal set; }
}
Member | Type | Description |
---|---|---|
Result | T |
The current batch item. If T is BlittableJsonReaderObject , no deserialization will take place. |
ExceptionMessage | string |
The exception message thrown during current document processing in the server side. |
Id | string |
The document ID of the underlying document for the current batch item. |
ChangeVector | string |
The change vector of the underlying document for the current batch item. |
RawResult | BlittableJsonReaderObject |
Current batch item before serialization to T . |
RawMetadata | BlittableJsonReaderObject |
Current batch item's underlying document metadata. |
Metadata | IMetadataDictionary |
Current batch item's underlying metadata values. |
This class should only be used within the subscription's Run
delegate.
Using it outside this scope may cause unexpected behavior.
SubscriptionWorker<T>
Methods
Method Signature | Return Type | Description |
---|---|---|
Dispose() | void |
Aborts subscription worker operation ungracefully by waiting for the task returned by the Run function to finish running. |
DisposeAsync() | Task |
Async version of Dispose() . |
Dispose(bool waitForSubscriptionTask) | void |
Aborts the subscription worker, but allows deciding whether to wait for the Run function task or not. |
DisposeAsync(bool waitForSubscriptionTask) | Task |
Async version of DisposeAsync(bool waitForSubscriptionTask) . |
Run (multiple overloads) | Task |
Call Run to begin the worker's batch processing.Pass the batch processing delegates to this method (see above). |
Events
Event | Event type | Description |
---|---|---|
AfterAcknowledgment | AfterAcknowledgmentAction |
Triggered after each time the server acknowledges the progress of batch processing. |
OnSubscriptionConnectionRetry | Action<Exception> |
Triggered when the subscription worker attempts to reconnect to the server after a failure. The event receives as a parameter the exception that interrupted the processing. |
OnDisposed | Action<SubscriptionWorker<T>> |
Triggered after the subscription worker is disposed. |
AfterAcknowledgmentAction
Parameter | ||
---|---|---|
batch | SubscriptionBatch<T> |
The batch process which was acknowledged |
Return value | |
---|---|
Task |
Task for which the worker will wait for the event processing to be finished (for async functions, etc.) |
Properties
Member | Type | Description |
---|---|---|
CurrentNodeTag | string |
The node tag of the current RavenDB server handling the subscription. |
SubscriptionName | string |
The name of the currently processed subscription. |
WorkerId | string |
The worker ID. |