Data Subscriptions: Subscription Consumption Examples
-
In this page:
- Subscription workers with failover on other nodes
- Worker with a specified batch size
- Client with full exception handling and processing retries
- Subscription that ends when no documents are left
- Worker that processes dynamic objects
- Subscription that works with a session
- Subscription that uses included documents
- Subscription that works with lowest level API
- Subscription workers with a primary and a secondary node
Subscription workers with failover on other nodes
In this configuration, any available node will create a worker.
If the worker fails, another available node will take over.
var worker = store.Subscriptions.GetSubscriptionWorker<Order>
(new SubscriptionWorkerOptions(subscriptionName)
{
Strategy = SubscriptionOpeningStrategy.WaitForFree
});
Worker with a specified batch size
Here we create a worker, specifying the maximum batch size we want to receive.
var workerWBatch = store.Subscriptions.GetSubscriptionWorker<Order>(
new SubscriptionWorkerOptions(subscriptionName)
{
MaxDocsPerBatch = 20
});
_ = workerWBatch.Run(x => { /* custom logic */ });
Client with full exception handling and processing retries
Here we implement a client that treats exceptions thrown by a worker, and retries creating the worker if an exception is recoverable.
while (true)
{
var options = new SubscriptionWorkerOptions(subscriptionName);
// here we configure that we allow a down time of up to 2 hours, and will wait for 2 minutes for reconnecting
options.MaxErroneousPeriod = TimeSpan.FromHours(2);
options.TimeToWaitBeforeConnectionRetry = TimeSpan.FromMinutes(2);
subscriptionWorker = store.Subscriptions.GetSubscriptionWorker<Order>(options);
try
{
// here we are able to be informed of any exception that happens during processing
subscriptionWorker.OnSubscriptionConnectionRetry += exception =>
{
Logger.Error("Error during subscription processing: " + subscriptionName, exception);
};
await subscriptionWorker.Run(async batch =>
{
foreach (var item in batch.Items)
{
// we want to force close the subscription processing in that case
// and let the external code decide what to do with that
if (item.Result.Company == "companies/832-A")
throw new UnsupportedCompanyException("Company Id can't be 'companies/832-A', you must fix this");
await ProcessOrder(item.Result);
}
}, cancellationToken);
// Run will complete normally if you have disposed the subscription
return;
}
catch (Exception e)
{
Logger.Error("Failure in subscription: " + subscriptionName, e);
if (e is DatabaseDoesNotExistException ||
e is SubscriptionDoesNotExistException ||
e is SubscriptionInvalidStateException ||
e is AuthorizationException)
throw; // not recoverable
if (e is SubscriptionClosedException)
// closed explicitly by admin, probably
return;
if (e is SubscriberErrorException se)
{
// for UnsupportedCompanyException type, we want to throw an exception, otherwise
// we continue processing
if (se.InnerException != null && se.InnerException is UnsupportedCompanyException)
{
throw;
}
continue;
}
// handle this depending on subscription
// open strategy (discussed later)
if (e is SubscriptionInUseException)
continue;
return;
}
finally
{
subscriptionWorker.Dispose();
}
}
Subscription that ends when no documents are left
Here we create a subscription client that runs only up to the point there are no more new documents left to process.
This is useful for an ad-hoc single-use processing that the user wants to be sure is performed completely.
var highValueOrdersWorker = store.Subscriptions.GetSubscriptionWorker<OrderAndCompany>(
new SubscriptionWorkerOptions(subsId)
{
// Here we ask the worker to stop when there are no documents left to send.
// Will throw SubscriptionClosedException when it finishes it's job
CloseWhenNoDocsLeft = true
});
try
{
await highValueOrdersWorker.Run(async batch =>
{
foreach (var item in batch.Items)
{
await SendThankYouNoteToEmployee(item.Result);
}
});
}
catch (SubscriptionClosedException)
{
// that's expected
}
Worker that processes dynamic objects
Here we create a worker that processes received data as dynamic objects.
var subscriptionName = "My dynamic subscription";
await store.Subscriptions.CreateAsync(new SubscriptionCreationOptions<Order>()
{
Name = "My dynamic subscription",
Projection = order => new { DynanamicField_1 = "Company: " + order.Company + " Employee: " + order.Employee }
});
var subscriptionWorker = store.Subscriptions.GetSubscriptionWorker(subscriptionName);
_ = subscriptionWorker.Run(async batch =>
{
foreach (var item in batch.Items)
{
await RaiseNotification(item.Result.DynanamicField_1);
}
});
Subscription that works with a session
Here we create a worker that receives all orders without a shipping date, lets the shipment mechanism handle it, and updates the ShippedAt
field value.
var subscriptionName = await store.Subscriptions.CreateAsync(new SubscriptionCreationOptions()
{
Query = @"from Orders as o where o.ShippedAt = null"
});
var subscriptionWorker = store.Subscriptions.GetSubscriptionWorker<Order>(subscriptionName);
_ = subscriptionWorker.Run(async batch =>
{
using (var session = batch.OpenAsyncSession())
{
foreach (var order in batch.Items.Select(x => x.Result))
{
await TransferOrderToShipmentCompanyAsync(order);
order.ShippedAt = DateTime.UtcNow;
}
// we know that we have at least one order to ship,
// because the subscription query above has that in it's WHERE clause
await session.SaveChangesAsync();
}
});
Subscription that uses included documents
Here we create a subscription utilizing the includes feature, by processing Order
documents and including all Product
s of each order.
When processing the subscription, we create a session using the SubscriptionBatch<T> object,
and for each order line, we obtain the Product
document and process it alongside with the Order
.
var subscriptionName = await store.Subscriptions.CreateAsync(new SubscriptionCreationOptions()
{
Query = @"from Orders include Lines[].Product"
});
var subscriptionWorker = store.Subscriptions.GetSubscriptionWorker<Order>(subscriptionName);
_ = subscriptionWorker.Run(async batch =>
{
using (var session = batch.OpenAsyncSession())
{
foreach (var order in batch.Items.Select(x => x.Result))
{
foreach (var orderLine in order.Lines)
{
// this line won't generate a request, because orderLine.Product was included
var product = await session.LoadAsync<Product>(orderLine.Product);
await RaiseNotification(order, product);
}
}
}
});
Subscription that works with lowest level API
Here we create a subscription that works with blittable document representation that can be useful in extreme high-performance scenarios, but it may be dangerous due to the direct usage of unmanaged memory.
await store.Subscriptions.CreateAsync(
new SubscriptionCreationOptions<Order>
{
Projection = x => new
{
x.Employee
}
});
var subscriptionWorker = store.Subscriptions.GetSubscriptionWorker<BlittableJsonReaderObject>(subscriptionId);
_ = subscriptionWorker.Run(async batch =>
{
foreach (var item in batch.Items)
{
await RaiseNotification(item.Result["Employee"].ToString());
}
});
Subscription workers with a primary and a secondary node
Here we create two workers:
- The primary worker, set with a
TakeOver
strategy, will take the lead over the secondary worker. - The secondary worker, set with a
WaitForFree
strategy, will take over if the primary worker fails (e.g. due to a machine failure).
The primary worker:
var primaryWorker = store.Subscriptions.GetSubscriptionWorker<Order>(new SubscriptionWorkerOptions(subscriptionName)
{
Strategy = SubscriptionOpeningStrategy.TakeOver
});
while (true)
{
try
{
await primaryWorker.Run(x =>
{
// your logic
});
}
catch (Exception)
{
// retry
}
}
The secondary worker:
var secondaryWorker = store.Subscriptions.GetSubscriptionWorker<Order>(new SubscriptionWorkerOptions(subscriptionName)
{
Strategy = SubscriptionOpeningStrategy.WaitForFree
});
while (true)
{
try
{
await secondaryWorker.Run(x =>
{
// your logic
});
}
catch (Exception)
{
// retry
}
}