Data Subscriptions: Subscription Consumption Examples



Worker with a specified batch size

Here we create a worker, specifying the maximum batch size we want to receive.

SubscriptionWorkerOptions options = new SubscriptionWorkerOptions(subscriptionName);
options.setMaxDocsPerBatch(20);
SubscriptionWorker<Order> workerWBatch = store.subscriptions().getSubscriptionWorker(Order.class, options);
workerWBatch.run(x -> { /* custom logic */});

Client with full exception handling and processing retries

Here we implement a client that treats exceptions thrown by worker, and retries creating the worker if an exception is recoverable.

while (true) {
    SubscriptionWorkerOptions options = new SubscriptionWorkerOptions(subscriptionName);
    // here we configure that we allow a down time of up to 2 hours,
    // and will wait for 2 minutes for reconnecting

    options.setMaxErroneousPeriod(Duration.ofHours(2));
    options.setTimeToWaitBeforeConnectionRetry(Duration.ofMinutes(2));

    subscriptionWorker = store.subscriptions().getSubscriptionWorker(Order.class, options);

    try {
        // here we are able to be informed of any exception that happens during processing
        subscriptionWorker.addOnSubscriptionConnectionRetry(exception -> {
            logger.error("Error during subscription processing: " + subscriptionName, exception);
        });

        subscriptionWorker.run(batch -> {
            for (SubscriptionBatch.Item<Order> item : batch.getItems()) {
                // we want to force close the subscription processing in that case
                // and let the external code decide what to do with that
                if ("Europe".equalsIgnoreCase(item.getResult().getShipVia())) {
                    throw new IllegalStateException("We cannot ship via Europe");
                }
                processOrder(item.getResult());
            }
        }).get();


        // Run will complete normally if you have disposed the subscription
        return;
    } catch (Exception e) {
        logger.error("Failure in subscription: " + subscriptionName, e);

        e = ExceptionsUtils.unwrapException(e);
        if (e instanceof DatabaseDoesNotExistException ||
            e instanceof SubscriptionDoesNotExistException ||
            e instanceof SubscriptionInvalidStateException ||
            e instanceof AuthorizationException) {
            throw e; // not recoverable
        }

        if (e instanceof SubscriptionClosedException) {
            // closed explicitly by admin, probably
            return;
        }

        if (e instanceof SubscriberErrorException) {
            SubscriberErrorException se = (SubscriberErrorException) e;
            // for IllegalStateException type, we want to throw an exception, otherwise
            // we continue processing
            if (se.getCause() != null && se.getCause() instanceof IllegalStateException) {
                throw e;
            }

            continue;
        }

        // handle this depending on subscription
        // open strategy (discussed later)
        if (e instanceof SubscriptionInUseException) {
            continue;
        }

        return;
    } finally {
        subscriptionWorker.close();
    }
}

Subscription that ends when no documents left

Here we create a subscription client that runs only up to the point there are no more new documents left to process.

This is useful for an ad-hoc single use processing that the user wants to be sure is performed completely.

SubscriptionWorkerOptions options = new SubscriptionWorkerOptions(subsId);

// Here we ask the worker to stop when there are no documents left to send.
// Will throw SubscriptionClosedException when it finishes it's job
options.setCloseWhenNoDocsLeft(true);
SubscriptionWorker<OrderAndCompany> highValueOrdersWorker = store
    .subscriptions().getSubscriptionWorker(OrderAndCompany.class, options);

try {
    highValueOrdersWorker.run(batch -> {
        for (SubscriptionBatch.Item<OrderAndCompany> item : batch.getItems()) {
            sendThankYouNoteToEmployee(item.getResult());
        }
    });
} catch (SubscriptionClosedException e) {
    //that's expected
}

Worker that processes raw objects

Here we create a worker that processes received data as ObjectNode objects.

String subscriptionName = "My dynamic subscription";

SubscriptionCreationOptions subscriptionCreationOptions = new SubscriptionCreationOptions();
subscriptionCreationOptions.setName("My dynamic subscription");
subscriptionCreationOptions.setQuery("from Orders as o \n" +
    "select { \n" +
    "   DynamicField_1: 'Company:' + o.Company + ' Employee: ' + o.Employee \n" +
    "}");

SubscriptionWorker<ObjectNode> worker = store.subscriptions().getSubscriptionWorker(subscriptionName);
worker.run(x -> {
    for (SubscriptionBatch.Item<ObjectNode> item : x.getItems()) {
        ObjectNode result = item.getResult();
        raiseNotification(result.get("DynamicField_1"));
    }
});

Two subscription workers that are waiting for each other

Here we create two workers:
* The main worker with the TAKE_OVER strategy that will take over the other one and will take the lead
* The secondary worker that will wait for the first one fail (due to machine failure etc.)

The main worker:

SubscriptionWorkerOptions options1 = new SubscriptionWorkerOptions(subscriptionName);
options1.setStrategy(SubscriptionOpeningStrategy.TAKE_OVER);
SubscriptionWorker<Order> worker1 = store.subscriptions().getSubscriptionWorker(Order.class, options1);


while (true) {
    try {
        worker1
            .run(x -> {
                // your logic
            });
    } catch (Exception e) {
        // retry
    }
}

The secondary worker:

SubscriptionWorkerOptions options2 = new SubscriptionWorkerOptions(subscriptionName);
options2.setStrategy(SubscriptionOpeningStrategy.WAIT_FOR_FREE);
SubscriptionWorker<Order> worker2 = store.subscriptions().getSubscriptionWorker(Order.class, options2);

while (true) {
    try {
        worker2
            .run(x -> {
                // your logic
            });
    } catch (Exception e) {
        // retry
    }
}