Data Subscriptions: Subscription Consumption Examples
In this page:
Worker with a specified batch size
Client with full exception handling and processing retries
Subscription that ends when no documents left
Worker that processes dynamic objects
Subscription that works with lowest level API
Two subscription workers that are waiting for each other
Worker with a specified batch size
Here we create a worker, specifying the maximum batch size we want to receive.
SubscriptionWorkerOptions options = new SubscriptionWorkerOptions(subscriptionName);
options.setMaxDocsPerBatch(20);
SubscriptionWorker<Order> workerWBatch = store.subscriptions().getSubscriptionWorker(Order.class, options);
workerWBatch.run(x -> { /* custom logic */});
Client with full exception handling and processing retries
Here we implement a client that treats exceptions thrown by worker, and retries creating the worker if an exception is recoverable.
while (true) {
SubscriptionWorkerOptions options = new SubscriptionWorkerOptions(subscriptionName);
// here we configure that we allow a down time of up to 2 hours,
// and will wait for 2 minutes for reconnecting
options.setMaxErroneousPeriod(Duration.ofHours(2));
options.setTimeToWaitBeforeConnectionRetry(Duration.ofMinutes(2));
subscriptionWorker = store.subscriptions().getSubscriptionWorker(Order.class, options);
try {
// here we are able to be informed of any exception that happens during processing
subscriptionWorker.addOnSubscriptionConnectionRetry(exception -> {
logger.error("Error during subscription processing: " + subscriptionName, exception);
});
subscriptionWorker.run(batch -> {
for (SubscriptionBatch.Item<Order> item : batch.getItems()) {
// we want to force close the subscription processing in that case
// and let the external code decide what to do with that
if ("Europe".equalsIgnoreCase(item.getResult().getShipVia())) {
throw new IllegalStateException("We cannot ship via Europe");
}
processOrder(item.getResult());
}
}).get();
// Run will complete normally if you have disposed the subscription
return;
} catch (Exception e) {
logger.error("Failure in subscription: " + subscriptionName, e);
e = ExceptionsUtils.unwrapException(e);
if (e instanceof DatabaseDoesNotExistException ||
e instanceof SubscriptionDoesNotExistException ||
e instanceof SubscriptionInvalidStateException ||
e instanceof AuthorizationException) {
throw e; // not recoverable
}
if (e instanceof SubscriptionClosedException) {
// closed explicitly by admin, probably
return;
}
if (e instanceof SubscriberErrorException) {
SubscriberErrorException se = (SubscriberErrorException) e;
// for IllegalStateException type, we want to throw an exception, otherwise
// we continue processing
if (se.getCause() != null && se.getCause() instanceof IllegalStateException) {
throw e;
}
continue;
}
// handle this depending on subscription
// open strategy (discussed later)
if (e instanceof SubscriptionInUseException) {
continue;
}
return;
} finally {
subscriptionWorker.close();
}
}
Subscription that ends when no documents left
Here we create a subscription client that runs only up to the point there are no more new documents left to process.
This is useful for an ad-hoc single use processing that the user wants to be sure is performed completely.
SubscriptionWorkerOptions options = new SubscriptionWorkerOptions(subsId);
// Here we ask the worker to stop when there are no documents left to send.
// Will throw SubscriptionClosedException when it finishes it's job
options.setCloseWhenNoDocsLeft(true);
SubscriptionWorker<OrderAndCompany> highValueOrdersWorker = store
.subscriptions().getSubscriptionWorker(OrderAndCompany.class, options);
try {
highValueOrdersWorker.run(batch -> {
for (SubscriptionBatch.Item<OrderAndCompany> item : batch.getItems()) {
sendThankYouNoteToEmployee(item.getResult());
}
});
} catch (SubscriptionClosedException e) {
//that's expected
}
Worker that processes raw objects
Here we create a worker that processes received data as ObjectNode objects.
String subscriptionName = "My dynamic subscription";
SubscriptionCreationOptions subscriptionCreationOptions = new SubscriptionCreationOptions();
subscriptionCreationOptions.setName("My dynamic subscription");
subscriptionCreationOptions.setQuery("from Orders as o \n" +
"select { \n" +
" DynamicField_1: 'Company:' + o.Company + ' Employee: ' + o.Employee \n" +
"}");
SubscriptionWorker<ObjectNode> worker = store.subscriptions().getSubscriptionWorker(subscriptionName);
worker.run(x -> {
for (SubscriptionBatch.Item<ObjectNode> item : x.getItems()) {
ObjectNode result = item.getResult();
raiseNotification(result.get("DynamicField_1"));
}
});
Two subscription workers that are waiting for each other
Here we create two workers:
* The main worker with the TAKE_OVER
strategy that will take over the other one and will take the lead
* The secondary worker that will wait for the first one fail (due to machine failure etc.)
The main worker:
SubscriptionWorkerOptions options1 = new SubscriptionWorkerOptions(subscriptionName);
options1.setStrategy(SubscriptionOpeningStrategy.TAKE_OVER);
SubscriptionWorker<Order> worker1 = store.subscriptions().getSubscriptionWorker(Order.class, options1);
while (true) {
try {
worker1
.run(x -> {
// your logic
});
} catch (Exception e) {
// retry
}
}
The secondary worker:
SubscriptionWorkerOptions options2 = new SubscriptionWorkerOptions(subscriptionName);
options2.setStrategy(SubscriptionOpeningStrategy.WAIT_FOR_FREE);
SubscriptionWorker<Order> worker2 = store.subscriptions().getSubscriptionWorker(Order.class, options2);
while (true) {
try {
worker2
.run(x -> {
// your logic
});
} catch (Exception e) {
// retry
}
}