Subscription Consumption Examples
- In this page:
Client with full exception handling and processing retries
Here we implement a client that handles exceptions thrown by the worker.
If the exception is recoverable, the client retries creating the worker.
// Create the subscription task on the server:
// ===========================================
const subscriptionName = await documentStore.subscriptions.create({
name: "ProcessOrdersWithLowFreight",
query: "from Orders where Freight < 0.5"
});
// Create the subscription worker that will consume the documents:
// ===============================================================
await setupReconnectingWorker(subscriptionName);
async function setupReconnectingWorker(subscriptionName) {
let subscriptionWorker;
await reconnect();
function closeWorker(worker) {
worker.dispose();
}
async function reconnect() {
if (subscriptionWorker) {
closeWorker(subscriptionWorker);
}
// Configure the worker:
const subscriptionWorkerOptions = {
subscriptionName: subscriptionName,
// Allow a downtime of up to 2 hours
maxErroneousPeriod: 2 * 3600 * 1000,
// Wait 2 minutes before reconnecting
timeToWaitBeforeConnectionRetry: 2 * 60 * 1000
};
subscriptionWorker =
store.subscriptions.getSubscriptionWorker(subscriptionWorkerOptions);
// Subscribe to connection retry events,
// and log any exceptions that occur during processing
subscriptionWorker.on("connectionRetry", error => {
console.error(
"Error during subscription processing: " + subscriptionName, error);
});
// Run the worker:
// ===============
subscriptionWorker.on("batch", (batch, callback) => {
try {
for (const item of batch.items) {
const orderDocument = item.result;
// Forcefully stop subscription processing if the ID is "companies/46-A"
// and throw an exception to let external logic handle the specific case
if (orderDocument.Company && orderDocument.Company === "companies/46-A") {
// 'The InvalidOperationException' thrown from here
// will be wrapped by `SubscriberErrorException`
callback(new InvalidOperationException(
"Company ID can't be 'companies/46-A', pleases fix"));
return;
}
// Process the order document - provide your own logic
processOrder(orderDocument);
}
// Call 'callback' once you're done
// The worker will send an acknowledgement to the server,
// so that server can send next batch
callback();
}
catch(err) {
callback(err);
}
});
// Handle errors:
// ==============
subscriptionWorker.on("error", error => {
console.error("Failure in subscription: " + subscriptionName, error);
// The following exceptions are Not recoverable
if (error.name === "DatabaseDoesNotExistException" ||
error.name === "SubscriptionDoesNotExistException" ||
error.name === "SubscriptionInvalidStateException" ||
error.name === "AuthorizationException") {
throw error;
}
if (error.name === "SubscriptionClosedException") {
// Subscription probably closed explicitly by admin
return closeWorker(subscriptionWorker);
}
if (error.name === "SubscriberErrorException") {
// For the InvalidOperationException we want to throw an exception,
// otherwise, continue processing
if (error.cause && error.cause.name === "InvalidOperationException") {
throw error;
}
setTimeout(reconnect, 1000);
return;
}
// Handle this depending on the subscription opening strategy
if (error.name === "SubscriptionInUseException") {
setTimeout(reconnect, 1000);
return;
}
setTimeout(reconnect, 1000);
return;
});
// Handle worker end event:
// ========================
subscriptionWorker.on("end", () => {
closeWorker(subscriptionWorker);
});
}
}
Worker with a specified batch size
Here we create a worker and specify the maximum number of documents the server will send to the worker in each batch.
// Create the subscription task on the server:
// ===========================================
const subscriptionName = await documentStore.subscriptions.create({
name: "ProcessOrders",
query: "from Orders"
});
// Create the subscription worker that will consume the documents:
// ===============================================================
const workerOptions = {
subscriptionName: subscriptionName,
maxDocsPerBatch: 20 // Set the maximum number of documents per batch
};
const worker = documentStore.subscriptions.getSubscriptionWorker(workerOptions);
worker.on("batch", (batch, callback) => {
try {
// Add your logic for processing the incoming batch items here...
// Call 'callback' once you're done
// The worker will send an acknowledgement to the server,
// so that server can send next batch
callback();
} catch(err) {
callback(err);
}
});
Worker that operates with a session
Here we create a subscription that sends Order documents that do not have a shipping date.
The worker receiving these documents will update the ShippedAt
field value and save the document back to the server via the session.
Note:
The session is opened with batch.openSession
instead of with documentStore.openSession
.
// Create the subscription task on the server:
// ===========================================
const subscriptionName = await documentStore.subscriptions.create({
name: "ProcessOrdersThatWereNotShipped",
query: "from Orders as o where o.ShippedAt = null"
});
// Create the subscription worker that will consume the documents:
// ===============================================================
const workerOptions = { subscriptionName };
const worker = documentStore.subscriptions.getSubscriptionWorker(workerOptions);
worker.on("batch", async (batch, callback)
try {
// Open a session with 'batch.openSession'
const session = batch.openSession();
for (const item of batch.items) {
orderDocument = item.result;
transferOrderToShipmentCompany(orderDocument); // call your custom method
orderDocument.ShippedAt = new Date(); // update the document field
}
// Save the updated Order documents
await session.saveChanges();
callback();
} catch(err) {
callback(err);
}
});
Worker that processes dynamic objects
Here we define a subscription that projects the Order documents into a dynamic format.
The worker processes the dynamic objects it receives.
// Create the subscription task on the server:
// ===========================================
const subscriptionName = await documentStore.subscriptions.create({
name: "ProcessDynamicFields",
query: `From Orders as o
Select {
dynamicField: "Company: " + o.Company + " Employee: " + o.Employee,
}`
});
// Create the subscription worker that will consume the documents:
// ===============================================================
const workerOptions = { subscriptionName };
const worker = documentStore.subscriptions.getSubscriptionWorker(workerOptions);
worker.on("batch", (batch, callback) => {
for (const item of batch.items) {
// Access the dynamic field in the document
const field = item.result.dynamicField;
// Call your custom method
processItem(field);
}
callback();
});
Subscription that ends when no documents are left
Here we create a subscription client that runs until there are no more new documents to process.
This is useful for ad-hoc, single-use processing where the user needs to ensure that all documents are fully processed.
// Create the subscription task on the server:
// ===========================================
// Define the filtering criteria
const query = `
declare function getOrderLinesSum(doc) {
var sum = 0;
for (var i in doc.Lines) {
sum += doc.Lines[i].PricePerUnit * doc.Lines[i].Quantity;
}
return sum;
}
from Orders as o
where getOrderLinesSum(o) > 10_000`;
// Create the subscription with the defined query
const subscriptionName = await documentStore.subscriptions.create({ query });
// Create the subscription worker that will consume the documents:
// ===============================================================
const workerOptions = {
subscriptionName: subscriptionName,
// Here we set the worker to stop when there are no more documents left to send
// Will throw SubscriptionClosedException when it finishes it's job
closeWhenNoDocsLeft: true
};
const highValueOrdersWorker =
documentStore.subscriptions.getSubscriptionWorker(workerOptions);
highValueOrdersWorker.on("batch", (batch, callback) => {
for (const item of batch.items) {
sendThankYouNoteToEmployee(item.result); // call your custom method
}
callback();
});
highValueOrdersWorker.on("error", err => {
if (err.name === "SubscriptionClosedException") {
// That's expected, no more documents to process
}
});
Subscription that uses included documents
Here we create a subscription that, in addition to sending all the Order documents to the worker,
will include all the referenced Product documents in the batch sent to the worker.
When the worker accesses these Product documents, no additional requests will be made to the server.
// Create the subscription task on the server:
// ===========================================
const subscriptionName = await documentStore.subscriptions.create({
name: "ProcessIncludedDocuments",
query: `from Orders include Lines[].Product`
});
// Create the subscription worker that will consume the documents:
// ===============================================================
const workerOptions = { subscriptionName };
const worker = documentStore.subscriptions.getSubscriptionWorker(workerOptions);
worker.on("batch", async (batch, callback) => {
// Open a session via 'batch.openSession'
// in order to access the Product documents
const session = batch.openSession();
for (const item of batch.items) {
const orderDocument = item.result;
for (const orderLine of orderDocument.Lines)
{
// Calling 'load' will Not generate a request to the server,
// because orderLine.Product was included in the batch
const product = await session.load(orderLine.Product);
const productName = product.Name;
// Call your custom method
processOrderAndProduct(order, product);
}
}
callback();
});
Primary and secondary workers
Here we create two workers:
- The primary worker, with a
TakeOver
strategy, will take over the other worker and establish the connection. - The secondary worker, with a
WaitForFree
strategy, will wait for the first worker to fail (due to machine failure, etc.).
The primary worker:
const workerOptions1 = {
subscriptionName,
strategy: "TakeOver",
documentType: Order
};
const worker1 = documentStore.subscriptions.getSubscriptionWorker(workerOptions1);
worker1.on("batch", (batch, callback) => {
// your logic
callback();
});
worker1.on("error", err => {
// retry
});
The secondary worker:
const workerOptions2 = {
subscriptionName,
strategy: "WaitForFree",
documentType: Order
};
const worker2 = documentStore.subscriptions.getSubscriptionWorker(workerOptions2);
worker2.on("batch", (batch, callback) => {
// your logic
callback();
});
worker2.on("error", err => {
// retry
});