Data Subscriptions: Subscription Consumption Examples



Worker with a specified batch size

Here we create a worker, specifying the maximum batch size we want to receive.

const options = { 
    subscriptionName,
    maxDocsPerBatch: 20
};

const workerWBatch = store.subscriptions.getSubscriptionWorker(options);
workerWBatch.on("batch", (batch, callback) => { /* custom logic */});

Client with full exception handling and processing retries

Here we implement a client that treats exceptions thrown by worker, and retries creating the worker if an exception is recoverable.

// here we configure that we allow a down time of up to 2 hours,
// and will wait for 2 minutes for reconnecting
const options = {
    subscriptionName,
    maxErroneousPeriod: 2 * 3600 * 1000,
    timeToWaitBeforeConnectionRetry: 2 * 60 * 1000
};

setupReconnectingSubscription(options);

function setupReconnectingSubscription(subscriptionWorkerOptions) {
    let subscriptionWorker;

    reconnect();

    function closeWorker(worker) {
        worker.removeAllListeners();
        worker.on("error", () => {}); // ignore errors from old connection
        worker.dispose();
    }

    function reconnect() {
        if (subscriptionWorker) {
            closeWorker();
        }

        subscriptionWorker = store.subscriptions.getSubscriptionWorker(subscriptionWorkerOptions);

        // here we are able to be informed of any exception that happens during processing
        subscriptionWorker.on("connectionRetry", error => {
            console.error(
                "Error during subscription processing: " + subscriptionName, error);
        });

        subscriptionWorker.on("batch", (batch, callback) => {
            for (const item of batch.items) {
                // we want to force close the subscription processing in that case
                // and let the external code decide what to do with that
                if (item.result.shipVia 
                    && "Europe" === item.result.shipVia) {
                    callback(new InvalidOperationException("We cannot ship via Europe."));
                    return;
                }

                processOrder(item.result);
            }
        });

        subscriptionWorker.on("error", error => {
            console.error("Failure in subscription: " + subscriptionName, error);

            if (error.name === "DatabaseDoesNotExistException" ||
                error.name === "SubscriptionDoesNotExistException" ||
                error.name === "SubscriptionInvalidStateException" ||
                error.name === "AuthorizationException") {
                throw error; 
            }

            if (error.name === "SubscriptionClosedException") {
                // closed explicitly by admin, probably
                return closeWorker(subscriptionWorker);
            }

            if (error.name === "SubscriberErrorException") {
                // for InvalidOperationException type, we want to throw an exception, otherwise
                // we continue processing
                // RavenDB client uses VError - it can nest errors and keep track of inner errors
                // under cause property
                if (error.cause && error.cause.name === "InvalidOperationException") {
                    throw error;
                }

                return reconnect();
            }

            // handle this depending on subscription
            // open strategy (discussed later)
            if (error.name === "SubscriptionInUseException") {
                return reconnect();
            }

            return reconnect();
        });

        subscriptionWorker.on("end", () => {
            closeWorker(subscriptionWorker);
        });
    }
}

Subscription that ends when no documents left

Here we create a subscription client that runs only up to the point there are no more new documents left to process.

This is useful for an ad-hoc single use processing that the user wants to be sure is performed completely.

const options = { 
    subscriptionName: subsId,
    // Here we ask the worker to stop when there are no documents left to send.
    // Will throw SubscriptionClosedException when it finishes it's job
    closeWhenNoDocsLeft: true
};

const highValueOrdersWorker = store
    .subscriptions.getSubscriptionWorker(options);

highValueOrdersWorker.on("batch", async (batch, callback) => {
    for (const item of batch.items) {
        await sendThankYouNoteToEmployee(item.result);
    }

    callback();
});

highValueOrdersWorker.on("error", err => {
    if (err.name === "SubscriptionClosedException"){
        //that's expected
    }
});

Two subscription workers that are waiting for each other

Here we create two workers:

  • The main worker with the TakeOver strategy that will take over the other one and will take the lead

  • The secondary worker that will wait for the first one fail (due to machine failure etc.)

The main worker:

const options1 = {
    subscriptionName,
    strategy: "TakeOver",
    documentType: Order
};

const worker1 = store.subscriptions.getSubscriptionWorker(options1);

worker1.on("batch", (batch, callback) => {
    // your logic
    callback();
});

worker1.on("error", err => {
    // retry
});

The secondary worker:

const options2 = {
    subscriptionName,
    strategy: "WaitForFree",
    documentType: Order
};

const worker2 = store.subscriptions.getSubscriptionWorker(options2);

worker2.on("batch", (batch, callback) => {
    // your logic
    callback();
});

worker2.on("error", err => {
    // retry
});