Data Subscriptions: Subscription Consumption Examples
In this page:
Worker with a specified batch size
Client with full exception handling and processing retries
Subscription that ends when no documents left
Subscription that processes ObjectNode objects
Subscription that works with a session
Subscription that uses included documents
Two subscription workers that are waiting for each other
Worker with a specified batch size
Here we create a worker, specifying the maximum batch size we want to receive.
SubscriptionWorkerOptions options = new SubscriptionWorkerOptions(subscriptionName);
options.setMaxDocsPerBatch(20);
SubscriptionWorker<Order> workerWBatch = store.subscriptions().getSubscriptionWorker(Order.class, options);
workerWBatch.run(x -> { /* custom logic */});
Client with full exception handling and processing retries
Here we implement a client that treats exceptions thrown by worker, and retries creating the worker if an exception is recoverable.
while (true) {
SubscriptionWorkerOptions options = new SubscriptionWorkerOptions(subscriptionName);
// here we configure that we allow a down time of up to 2 hours,
// and will wait for 2 minutes for reconnecting
options.setMaxErroneousPeriod(Duration.ofHours(2));
options.setTimeToWaitBeforeConnectionRetry(Duration.ofMinutes(2));
subscriptionWorker = store.subscriptions().getSubscriptionWorker(Order.class, options);
try {
// here we are able to be informed of any exception that happens during processing
subscriptionWorker.addOnSubscriptionConnectionRetry(exception -> {
logger.error("Error during subscription processing: " + subscriptionName, exception);
});
subscriptionWorker.run(batch -> {
for (SubscriptionBatch.Item<Order> item : batch.getItems()) {
// we want to force close the subscription processing in that case
// and let the external code decide what to do with that
if ("Europe".equalsIgnoreCase(item.getResult().getShipVia())) {
throw new IllegalStateException("We cannot ship via Europe");
}
processOrder(item.getResult());
}
}).get();
// Run will complete normally if you have disposed the subscription
return;
} catch (Exception e) {
logger.error("Failure in subscription: " + subscriptionName, e);
e = ExceptionsUtils.unwrapException(e);
if (e instanceof DatabaseDoesNotExistException ||
e instanceof SubscriptionDoesNotExistException ||
e instanceof SubscriptionInvalidStateException ||
e instanceof AuthorizationException) {
throw e; // not recoverable
}
if (e instanceof SubscriptionClosedException) {
// closed explicitly by admin, probably
return;
}
if (e instanceof SubscriberErrorException) {
SubscriberErrorException se = (SubscriberErrorException) e;
// for IllegalStateException type, we want to throw an exception, otherwise
// we continue processing
if (se.getCause() != null && se.getCause() instanceof IllegalStateException) {
throw e;
}
continue;
}
// handle this depending on subscription
// open strategy (discussed later)
if (e instanceof SubscriptionInUseException) {
continue;
}
return;
} finally {
subscriptionWorker.close();
}
}
Subscription that ends when no documents left
Here we create a subscription client that runs only up to the point there are no more new documents left to process.
This is useful for an ad-hoc single use processing that the user wants to be sure is performed completely.
SubscriptionWorkerOptions options = new SubscriptionWorkerOptions(subsId);
// Here we ask the worker to stop when there are no documents left to send.
// Will throw SubscriptionClosedException when it finishes it's job
options.setCloseWhenNoDocsLeft(true);
SubscriptionWorker<OrderAndCompany> highValueOrdersWorker = store
.subscriptions().getSubscriptionWorker(OrderAndCompany.class, options);
try {
highValueOrdersWorker.run(batch -> {
for (SubscriptionBatch.Item<OrderAndCompany> item : batch.getItems()) {
sendThankYouNoteToEmployee(item.getResult());
}
});
} catch (SubscriptionClosedException e) {
//that's expected
}
Worker that processes raw objects
Here we create a worker that processes received data as ObjectNode objects.
String subscriptionName = "My dynamic subscription";
SubscriptionCreationOptions subscriptionCreationOptions = new SubscriptionCreationOptions();
subscriptionCreationOptions.setName("My dynamic subscription");
subscriptionCreationOptions.setQuery("from Orders as o \n" +
"select { \n" +
" DynamicField_1: 'Company:' + o.Company + ' Employee: ' + o.Employee \n" +
"}");
SubscriptionWorker<ObjectNode> worker = store.subscriptions().getSubscriptionWorker(subscriptionName);
worker.run(x -> {
for (SubscriptionBatch.Item<ObjectNode> item : x.getItems()) {
ObjectNode result = item.getResult();
raiseNotification(result.get("DynamicField_1"));
}
});
Subscription that works with a session
Here we create a worker that receives all orders without a shipping date, lets the shipment mechanism to handle it and updates the ShippedAt
field value.
SubscriptionCreationOptions subscriptionCreationOptions = new SubscriptionCreationOptions();
subscriptionCreationOptions.setQuery("from Orders as o where o.ShippedAt = null");
String subscriptionName = store.subscriptions().create(subscriptionCreationOptions);
SubscriptionWorker<Order> subscriptionWorker = store.subscriptions().getSubscriptionWorker(Order.class, subscriptionName);
subscriptionWorker.run(batch -> {
try (IDocumentSession session = batch.openSession()) {
for (SubscriptionBatch.Item<Order> orderItem : batch.getItems()) {
transferOrderToShipmentCompany(orderItem.getResult());
orderItem.getResult().setShippedAt(new Date());
}
// we know that we have at least one order to ship,
// because the subscription query above has that in it's WHERE clause
session.saveChanges();
}
});
Subscription that uses included documents
Here we create a subscription utilizing the includes feature, by processing Order
documents and including all Product
s of each order.
When processing the subscription, we create a session using the SubscriptionBatch<T> object,
and for each order line, we obtain the Product
document and process it alongside with the Order
.
SubscriptionCreationOptions subscriptionCreationOptions = new SubscriptionCreationOptions();
subscriptionCreationOptions.setQuery("from Orders include Lines[].Product");
String subscriptionName = store.subscriptions().create(subscriptionCreationOptions);
SubscriptionWorker<Order> subscriptionWorker = store.subscriptions().getSubscriptionWorker(Order.class, subscriptionName);
subscriptionWorker.run(batch -> {
try (IDocumentSession session = batch.openSession()) {
for (SubscriptionBatch.Item<Order> orderItem : batch.getItems()) {
Order order = orderItem.getResult();
for (OrderLine orderLine : order.getLines()) {
// this line won't generate a request, because orderLine.Product was included
Product product = session.load(Product.class, orderLine.getProduct());
raiseProductNotification(order, product);
}
}
}
});
Two subscription workers that are waiting for each other
Here we create two workers:
* The main worker with the TAKE_OVER
strategy that will take over the other one and will take the lead
* The secondary worker that will wait for the first one fail (due to machine failure etc.)
The main worker:
SubscriptionWorkerOptions options1 = new SubscriptionWorkerOptions(subscriptionName);
options1.setStrategy(SubscriptionOpeningStrategy.TAKE_OVER);
SubscriptionWorker<Order> worker1 = store.subscriptions().getSubscriptionWorker(Order.class, options1);
while (true) {
try {
worker1
.run(x -> {
// your logic
});
} catch (Exception e) {
// retry
}
}
The secondary worker:
SubscriptionWorkerOptions options2 = new SubscriptionWorkerOptions(subscriptionName);
options2.setStrategy(SubscriptionOpeningStrategy.WAIT_FOR_FREE);
SubscriptionWorker<Order> worker2 = store.subscriptions().getSubscriptionWorker(Order.class, options2);
while (true) {
try {
worker2
.run(x -> {
// your logic
});
} catch (Exception e) {
// retry
}
}