Subscription Consumption Examples



Worker with a specified batch size

Here we create a worker and specify the maximum number of documents the server will send to the worker in each batch.

SubscriptionWorkerOptions options = new SubscriptionWorkerOptions(subscriptionName);
options.setMaxDocsPerBatch(20);
SubscriptionWorker<Order> workerWBatch = store.subscriptions().getSubscriptionWorker(Order.class, options);
workerWBatch.run(x -> { /* custom logic */});

Client with full exception handling and processing retries

Here we implement a client that handles exceptions thrown by a worker.
If the exception is recoverable, the client retries creating the worker.

while (true) {
    SubscriptionWorkerOptions options = new SubscriptionWorkerOptions(subscriptionName);
    // here we configure that we allow a down time of up to 2 hours,
    // and will wait for 2 minutes for reconnecting

    options.setMaxErroneousPeriod(Duration.ofHours(2));
    options.setTimeToWaitBeforeConnectionRetry(Duration.ofMinutes(2));

    subscriptionWorker = store.subscriptions().getSubscriptionWorker(Order.class, options);

    try {
        // here we are able to be informed of any exception that happens during processing
        subscriptionWorker.addOnSubscriptionConnectionRetry(exception -> {
            logger.error("Error during subscription processing: " + subscriptionName, exception);
        });

        subscriptionWorker.run(batch -> {
            for (SubscriptionBatch.Item<Order> item : batch.getItems()) {
                // we want to force close the subscription processing in that case
                // and let the external code decide what to do with that
                if ("Europe".equalsIgnoreCase(item.getResult().getShipVia())) {
                    throw new IllegalStateException("We cannot ship via Europe");
                }
                processOrder(item.getResult());
            }
        }).get();


        // Run will complete normally if you have disposed the subscription
        return;
    } catch (Exception e) {
        logger.error("Failure in subscription: " + subscriptionName, e);

        e = ExceptionsUtils.unwrapException(e);
        if (e instanceof DatabaseDoesNotExistException ||
            e instanceof SubscriptionDoesNotExistException ||
            e instanceof SubscriptionInvalidStateException ||
            e instanceof AuthorizationException) {
            throw e; // not recoverable
        }

        if (e instanceof SubscriptionClosedException) {
            // closed explicitly by admin, probably
            return;
        }

        if (e instanceof SubscriberErrorException) {
            SubscriberErrorException se = (SubscriberErrorException) e;
            // for IllegalStateException type, we want to throw an exception, otherwise
            // we continue processing
            if (se.getCause() != null && se.getCause() instanceof IllegalStateException) {
                throw e;
            }

            continue;
        }

        // handle this depending on subscription
        // open strategy (discussed later)
        if (e instanceof SubscriptionInUseException) {
            continue;
        }

        return;
    } finally {
        subscriptionWorker.close();
    }
}

Subscription that ends when no documents left

Here we create a subscription client that runs only up to the point there are no more new documents left to process.

This is useful for ad-hoc, single-use processing where the user needs to ensure that all documents are fully processed.

SubscriptionWorkerOptions options = new SubscriptionWorkerOptions(subsId);

// Here we ask the worker to stop when there are no documents left to send.
// Will throw SubscriptionClosedException when it finishes it's job
options.setCloseWhenNoDocsLeft(true);
SubscriptionWorker<OrderAndCompany> highValueOrdersWorker = store
    .subscriptions().getSubscriptionWorker(OrderAndCompany.class, options);

try {
    highValueOrdersWorker.run(batch -> {
        for (SubscriptionBatch.Item<OrderAndCompany> item : batch.getItems()) {
            sendThankYouNoteToEmployee(item.getResult());
        }
    });
} catch (SubscriptionClosedException e) {
    //that's expected
}

Worker that processes raw objects

Here we create a worker that processes received data as ObjectNode objects.

String subscriptionName = "My dynamic subscription";

SubscriptionCreationOptions subscriptionCreationOptions = new SubscriptionCreationOptions();
subscriptionCreationOptions.setName("My dynamic subscription");
subscriptionCreationOptions.setQuery("from Orders as o \n" +
    "select { \n" +
    "   DynamicField_1: 'Company:' + o.Company + ' Employee: ' + o.Employee \n" +
    "}");

SubscriptionWorker<ObjectNode> worker = store.subscriptions().getSubscriptionWorker(subscriptionName);
worker.run(x -> {
    for (SubscriptionBatch.Item<ObjectNode> item : x.getItems()) {
        ObjectNode result = item.getResult();
        raiseNotification(result.get("DynamicField_1"));
    }
});

Worker that operates with a session

Here we create a subscription that sends Order documents that do not have a shipping date.
The worker receiving these documents will update the ShippedAt field value and save the document back to the server via the session.

SubscriptionCreationOptions subscriptionCreationOptions = new SubscriptionCreationOptions();
subscriptionCreationOptions.setQuery("from Orders as o where o.ShippedAt = null");
String subscriptionName = store.subscriptions().create(subscriptionCreationOptions);

SubscriptionWorker<Order> subscriptionWorker = store.subscriptions().getSubscriptionWorker(Order.class, subscriptionName);

subscriptionWorker.run(batch -> {
    try (IDocumentSession session = batch.openSession()) {
        for (SubscriptionBatch.Item<Order> orderItem : batch.getItems()) {
            transferOrderToShipmentCompany(orderItem.getResult());
            orderItem.getResult().setShippedAt(new Date());
        }

        // we know that we have at least one order to ship,
        // because the subscription query above has that in it's WHERE clause
        session.saveChanges();
    }
});

Subscription that uses included documents

Here we create a subscription that, in addition to sending all the Order documents to the worker,
will include all the referenced Product documents in the batch sent to the worker.

When the worker accesses these Product documents, no additional requests will be made to the server.

SubscriptionCreationOptions subscriptionCreationOptions = new SubscriptionCreationOptions();
subscriptionCreationOptions.setQuery("from Orders include Lines[].Product");


String subscriptionName = store.subscriptions().create(subscriptionCreationOptions);

SubscriptionWorker<Order> subscriptionWorker = store.subscriptions().getSubscriptionWorker(Order.class, subscriptionName);

subscriptionWorker.run(batch -> {
    try (IDocumentSession session = batch.openSession()) {
        for (SubscriptionBatch.Item<Order> orderItem : batch.getItems()) {
            Order order = orderItem.getResult();
            for (OrderLine orderLine : order.getLines()) {
                // this line won't generate a request, because orderLine.Product was included
                Product product = session.load(Product.class, orderLine.getProduct());
                raiseProductNotification(order, product);
            }
        }
    }
});

Primary and secondary workers

Here we create two workers:

  • The primary worker, with a TAKE_OVER strategy, will take over the other worker and establish the connection.
  • The secondary worker, with a WAIT_FOR_FREE strategy, will wait for the first worker to fail (due to machine failure, etc.).

The primary worker:

SubscriptionWorkerOptions options1 = new SubscriptionWorkerOptions(subscriptionName);
options1.setStrategy(SubscriptionOpeningStrategy.TAKE_OVER);
SubscriptionWorker<Order> worker1 = store.subscriptions().getSubscriptionWorker(Order.class, options1);


while (true) {
    try {
        worker1
            .run(x -> {
                // your logic
            });
    } catch (Exception e) {
        // retry
    }
}

The secondary worker:

SubscriptionWorkerOptions options2 = new SubscriptionWorkerOptions(subscriptionName);
options2.setStrategy(SubscriptionOpeningStrategy.WAIT_FOR_FREE);
SubscriptionWorker<Order> worker2 = store.subscriptions().getSubscriptionWorker(Order.class, options2);

while (true) {
    try {
        worker2
            .run(x -> {
                // your logic
            });
    } catch (Exception e) {
        // retry
    }
}