Subscription Consumption Examples
-
In this page:
- Client with full exception handling and processing retries
- Worker with a specified batch size
- Worker that operates with a session
- Worker that processes dynamic objects
- Worker that processes a blittable object
- Subscription that ends when no documents are left
- Subscription that uses included documents
- Subscription workers with failover on other nodes
- Primary and secondary workers
Client with full exception handling and processing retries
Here we implement a client that handles exceptions thrown by the worker.
If the exception is recoverable, the client retries creating the worker.
while (true)
{
// Create the worker:
// ==================
var options = new SubscriptionWorkerOptions(subscriptionName);
// Configure the worker:
// Allow a downtime of up to 2 hours,
// and wait 2 minutes before reconnecting
options.MaxErroneousPeriod = TimeSpan.FromHours(2);
options.TimeToWaitBeforeConnectionRetry = TimeSpan.FromMinutes(2);
subscriptionWorker = store.Subscriptions.GetSubscriptionWorker<Order>(options);
try
{
// Subscribe to connection retry events
// and log any exceptions that occur during processing
subscriptionWorker.OnSubscriptionConnectionRetry += exception =>
{
Logger.Error("Error during subscription processing: " + subscriptionName,
exception);
};
// Run the worker:
// ===============
await subscriptionWorker.Run(batch =>
{
foreach (var item in batch.Items)
{
// Forcefully stop subscription processing if the ID is "companies/2-A"
// and throw an exception to let external logic handle the specific case
if (item.Result.Company == "companies/2-A")
{
// The custom exception thrown from here
// will be wrapped by `SubscriberErrorException`
throw new UnsupportedCompanyException(
"Company ID can't be 'companies/2-A', pleases fix");
}
// Process the order document - provide your own logic
ProcessOrder(item.Result);
}
}, cancellationToken);
// The Run method will stop if the subscription worker is disposed,
// exiting the while loop
return;
}
catch (Exception e)
{
Logger.Error("Failure in subscription: " + subscriptionName, e);
// The following exceptions are Not recoverable
if (e is DatabaseDoesNotExistException ||
e is SubscriptionDoesNotExistException ||
e is SubscriptionInvalidStateException ||
e is AuthorizationException)
throw;
if (e is SubscriptionClosedException)
// Subscription probably closed explicitly by admin
return;
if (e is SubscriberErrorException se)
{
// For UnsupportedCompanyException we want to throw an exception,
// otherwise, continue processing
if (se.InnerException != null && se.InnerException is UnsupportedCompanyException)
{
throw;
}
// Call continue to skip the current while(true) iteration and try reconnecting
// in the next one, allowing the worker to process future batches.
continue;
}
// Handle this depending on the subscription opening strategy
if (e is SubscriptionInUseException)
continue;
// Call return to exit the while(true) loop,
// dispose the worker (via finally), and stop the subscription.
return;
}
finally
{
subscriptionWorker.Dispose();
}
}
Worker with a specified batch size
Here we create a worker and specify the maximum number of documents the server will send to the worker in each batch.
var workerWBatch = store.Subscriptions.GetSubscriptionWorker<Order>(
new SubscriptionWorkerOptions(subscriptionName)
{
MaxDocsPerBatch = 20
});
_ = workerWBatch.Run(x =>
{
// your custom logic
});
Worker that operates with a session
Here we create a subscription that sends Order documents that do not have a shipping date.
The worker receiving these documents will update the ShippedAt
field value and save the document back to the server via the session.
Note:
The session is opened with batch.OpenSession
instead of with Store.OpenSession
.
// Create the subscription task on the server:
// ===========================================
var subscriptionName = store.Subscriptions.Create(new SubscriptionCreationOptions()
{
Query = @"from Orders as o where o.ShippedAt = null"
});
// Create the subscription worker that will consume the documents:
// ===============================================================
var subscriptionWorker = store.Subscriptions.GetSubscriptionWorker<Order>(subscriptionName);
_ = subscriptionWorker.Run(batch =>
{
// Open a session with 'batch.OpenSession'
using (var session = batch.OpenSession())
{
foreach (var order in batch.Items.Select(x => x.Result))
{
TransferOrderToShipmentCompany(order); // call your custom method
order.ShippedAt = DateTime.UtcNow; // update the document field
}
// Save the updated Order documents
session.SaveChanges();
}
});
Worker that processes dynamic objects
Here we define a subscription that projects the Order documents into a dynamic format.
The worker processes the dynamic objects it receives.
// Create the subscription task on the server:
// ===========================================
var subscriptionName = "My dynamic subscription";
store.Subscriptions.Create(new SubscriptionCreationOptions<Order>()
{
Name = subscriptionName,
Projection = order =>
new { DynanamicField_1 = "Company: " + order.Company + " Employee: " + order.Employee }
});
// Create the subscription worker that will consume the documents:
// ===============================================================
var subscriptionWorker = store.Subscriptions.GetSubscriptionWorker(subscriptionName);
_ = subscriptionWorker.Run(batch =>
{
foreach (var item in batch.Items)
{
// Access the dynamic field in the document
dynamic field = item.Result.DynanamicField_1;
// Call your custom method
ProcessItem(field);
}
});
Worker that processes a blittable object
Create a worker that processes documents as low level blittable objects.
This can be useful in extreme high-performance scenarios, but may be dangerous due to the direct usage of unmanaged memory.
// Create the subscription task on the server:
// ===========================================
var subscriptionName = store.Subscriptions.Create(new SubscriptionCreationOptions<Order>
{
Projection = x => new
{
x.Employee
}
});
// Create the subscription worker that will consume the documents:
// ===============================================================
var subscriptionWorker =
// Specify `BlittableJsonReaderObject` as the generic type parameter
store.Subscriptions.GetSubscriptionWorker<BlittableJsonReaderObject>(subscriptionName);
_ = subscriptionWorker.Run(batch =>
{
foreach (var item in batch.Items)
{
// Access the Employee field within the blittable object
var employeeField = item.Result["Employee"].ToString();
ProcessItem(employeeField); // call your custom method
}
});
Subscription that ends when no documents are left
Here we create a subscription client that runs until there are no more new documents to process.
This is useful for ad-hoc, single-use processing where the user needs to ensure that all documents are fully processed.
// Create the subscription task on the server:
// ===========================================
var subscriptionName = store.Subscriptions.Create<Order>(
new SubscriptionCreationOptions<Order>
{
Filter = order => order.Lines.Sum(line => line.PricePerUnit * line.Quantity) > 10000,
Projection = order => new OrderAndCompany
{
OrderId = order.Id,
Company = RavenQuery.Load<Company>(order.Company)
}
});
// Create the subscription worker that will consume the documents:
// ===============================================================
var highValueOrdersWorker = store.Subscriptions.GetSubscriptionWorker<OrderAndCompany>(
new SubscriptionWorkerOptions(subscriptionName)
{
// Here we set the worker to stop when there are no more documents left to send
// Will throw SubscriptionClosedException when it finishes it's job
CloseWhenNoDocsLeft = true
});
try
{
await highValueOrdersWorker.Run(batch =>
{
foreach (var item in batch.Items)
{
SendThankYouNoteToEmployee(item.Result); // call your custom method
}
});
}
catch (SubscriptionClosedException)
{
// That's expected, no more documents to process
}
Subscription that uses included documents
Here we create a subscription that, in addition to sending all the Order documents to the worker,
will include all the referenced Product documents in the batch sent to the worker.
When the worker accesses these Product documents, no additional requests will be made to the server.
// Create the subscription task on the server:
// ===========================================
var subscriptionName = store.Subscriptions.Create(new SubscriptionCreationOptions()
{
// Include the referenced Product documents for each Order document
Query = @"from Orders include Lines[].Product"
});
// Create the subscription worker that will consume the documents:
// ===============================================================
var subscriptionWorker = store.Subscriptions.GetSubscriptionWorker<Order>(subscriptionName);
_ = subscriptionWorker.Run(batch =>
{
// Open a session via 'batch.OpenSession'
// in order to access the Product documents
using (var session = batch.OpenSession())
{
foreach (var order in batch.Items.Select(x => x.Result))
{
foreach (var orderLine in order.Lines)
{
// Calling Load will Not generate a request to the server,
// because orderLine.Product was included in the batch
var product = session.Load<Product>(orderLine.Product);
ProcessOrderAndProduct(order, product); // call your custom method
}
}
}
});
Subscription workers with failover on other nodes
In this configuration, any available node will create a worker.
If the worker fails, another available node will take over.
var worker = store.Subscriptions.GetSubscriptionWorker<Order>(
new SubscriptionWorkerOptions(subscriptionName)
{
Strategy = SubscriptionOpeningStrategy.WaitForFree
});
Primary and secondary workers
Here we create two workers:
- The primary worker, with a
TakeOver
strategy, will take over the other worker and establish the connection. - The secondary worker, with a
WaitForFree
strategy, will wait for the first worker to fail (due to machine failure, etc.).
The primary worker:
var primaryWorker = store.Subscriptions.GetSubscriptionWorker<Order>(
new SubscriptionWorkerOptions(subscriptionName)
{
Strategy = SubscriptionOpeningStrategy.TakeOver
});
while (true)
{
try
{
await primaryWorker.Run(x =>
{
// your logic
});
}
catch (Exception)
{
// retry
}
}
The secondary worker:
var secondaryWorker = store.Subscriptions.GetSubscriptionWorker<Order>(
new SubscriptionWorkerOptions(subscriptionName)
{
Strategy = SubscriptionOpeningStrategy.WaitForFree
});
while (true)
{
try
{
await secondaryWorker.Run(x =>
{
// your logic
});
}
catch (Exception)
{
// retry
}
}