You are currently browsing legacy 4.0 version of documentation. Click here to switch to the newest 4.2 version.

We can help you with migration to the latest RavenDB

Contact Us Now
see on GitHub

Data Subscriptions: Subscription Consumption Examples



Worker with a specified batch size

Here we create a worker, specifying the maximum batch size we want to receive.

const options = { 
    subscriptionName,
    maxDocsPerBatch: 20
};

const workerWBatch = store.subscriptions.getSubscriptionWorker(options);
workerWBatch.on("batch", (batch, callback) => { /* custom logic */});

Client with full exception handling and processing retries

Here we implement a client that treats exceptions thrown by worker, and retries creating the worker if an exception is recoverable.

// here we configure that we allow a down time of up to 2 hours,
// and will wait for 2 minutes for reconnecting
const options = {
    subscriptionName,
    maxErroneousPeriod: 2 * 3600 * 1000,
    timeToWaitBeforeConnectionRetry: 2 * 60 * 1000
};

setupReconnectingSubscription(options);

function setupReconnectingSubscription(subscriptionWorkerOptions) {
    let subscriptionWorker;

    reconnect();

    function closeWorker(worker) {
        worker.removeAllListeners();
        worker.on("error", () => {}); // ignore errors from old connection
        worker.dispose();
    }

    function reconnect() {
        if (subscriptionWorker) {
            closeWorker();
        }

        subscriptionWorker = store.subscriptions.getSubscriptionWorker(subscriptionWorkerOptions);

        // here we are able to be informed of any exception that happens during processing
        subscriptionWorker.on("connectionRetry", error => {
            console.error(
                "Error during subscription processing: " + subscriptionName, error);
        });

        subscriptionWorker.on("batch", (batch, callback) => {
            for (const item of batch.items) {
                // we want to force close the subscription processing in that case
                // and let the external code decide what to do with that
                if (item.result.shipVia 
                    && "Europe" === item.result.shipVia) {
                    callback(new InvalidOperationException("We cannot ship via Europe."));
                    return;
                }

                processOrder(item.result);
            }
        });

        subscriptionWorker.on("error", error => {
            console.error("Failure in subscription: " + subscriptionName, error);

            if (error.name === "DatabaseDoesNotExistException" ||
                error.name === "SubscriptionDoesNotExistException" ||
                error.name === "SubscriptionInvalidStateException" ||
                error.name === "AuthorizationException") {
                throw error; 
            }

            if (error.name === "SubscriptionClosedException") {
                // closed explicitly by admin, probably
                return closeWorker(subscriptionWorker);
            }

            if (error.name === "SubscriberErrorException") {
                // for InvalidOperationException type, we want to throw an exception, otherwise
                // we continue processing
                // RavenDB client uses VError - it can nest errors and keep track of inner errors
                // under cause property
                if (error.cause && error.cause.name === "InvalidOperationException") {
                    throw error;
                }

                return reconnect();
            }

            // handle this depending on subscription
            // open strategy (discussed later)
            if (error.name === "SubscriptionInUseException") {
                return reconnect();
            }

            return reconnect();
        });

        subscriptionWorker.on("end", () => {
            closeWorker(subscriptionWorker);
        });
    }
}

Subscription that ends when no documents left

Here we create a subscription client that runs only up to the point there are no more new documents left to process.

This is useful for an ad-hoc single use processing that the user wants to be sure is performed completely.

const options = { 
    subscriptionName: subsId,
    // Here we ask the worker to stop when there are no documents left to send.
    // Will throw SubscriptionClosedException when it finishes it's job
    closeWhenNoDocsLeft: true
};

const highValueOrdersWorker = store
    .subscriptions.getSubscriptionWorker(options);

highValueOrdersWorker.on("batch", async (batch, callback) => {
    for (const item of batch.items) {
        await sendThankYouNoteToEmployee(item.result);
    }

    callback();
});

highValueOrdersWorker.on("error", err => {
    if (err.name === "SubscriptionClosedException"){
        //that's expected
    }
});

Two subscription workers that are waiting for each other

Here we create two workers:

  • The main worker with the TakeOver strategy that will take over the other one and will take the lead

  • The secondary worker that will wait for the first one fail (due to machine failure etc.)

The main worker:

const options1 = {
    subscriptionName,
    strategy: "TakeOver",
    documentType: Order
};

const worker1 = store.subscriptions.getSubscriptionWorker(options1);

worker1.on("batch", (batch, callback) => {
    // your logic
    callback();
});

worker1.on("error", err => {
    // retry
});

The secondary worker:

const options2 = {
    subscriptionName,
    strategy: "WaitForFree",
    documentType: Order
};

const worker2 = store.subscriptions.getSubscriptionWorker(options2);

worker2.on("batch", (batch, callback) => {
    // your logic
    callback();
});

worker2.on("error", err => {
    // retry
});