Data Subscriptions: Revisions Support



Regular subscription vs Revisions subscription

Regular subscription

  • Processed items:
    The subscription processes documents from the defined collection.
    Only the latest version of the document is processed, even if the document has revisions.
  • Query access scope:
    The subscription query running on the server has access only to the latest/current version of the documents.
  • Data sent to client:
    Each item in the batch sent to the client contains a single document (or a projection of it),
    as defined in the subscription.

Revisions subscription

  • Processed items:
    The subscription processes all revisions of documents from the defined collection,
    including revisions of deleted documents from the revision bin if they have not been purged.
  • Query access scope:
    For each revision, the subscription query running on the server has access to both the currently processed revision and its previous revision.
  • Data sent to client:
    By default, unless the subscription query is projecting specific fields, each item in the batch sent to the client contains both the processed revision (result.current) and its preceding revision (result.previous). If the document has just been created, the previous revision will be null.

  • In order for the revisions subscription to work,
    Revisions must be configured and enabled for the collection the subscription manages.

  • A document that has no revisions will Not be processed, so make sure that your revisions configuration does not purge revisions before the subscription has a chance to process them.

Revisions processing order

In the revisions subscription, revisions are processed in pairs of subsequent entries.
For example, consider the following User document:

{
    Name: "James",
    Age: "21"
}

We update this User document in two consecutive operations:

  • Update the 'Age' field to the value of 22
  • Update the 'Age' field to the value of 23

The subscription worker in the client will receive pairs of revisions ( previous & current )
within each item in the batch in the following order:

Batch item Previous Current
item #1 null { Name: "James", Age: "21" }
item #2 { Name: "James", Age: "21" } { Name: "James", Age: "22" }
item #3 { Name: "James", Age: "22" } { Name: "James", Age: "23" }

Simple creation and consumption

Here we set up a basic revisions subscription that will deliver pairs of consecutive Order document revisions to the client:

Create subscription:

const subscriptionName = await documentStore.subscriptions.create({
    // Add (Revisions = true) to your subscription RQL
    query: "From Orders (Revisions = true)"
});

Consume subscription:

const workerOptions = { subscriptionName };

const worker = 
    // Use method `getSubscriptionWorkerForRevisions`
    documentStore.subscriptions.getSubscriptionWorkerForRevisions(workerOptions);

worker.on("batch", (batch, callback) => {
    try {
        for (const item of batch.items) {

            // Access the previous revision via 'result.previous'
            const previousRevision = item.result.previous;

            // Access the current revision via 'result.current'
            const currentRevision = item.result.current;
        }
        callback();
        
    } catch (err) {
        callback(err);
    }
});

Filtering revisions

Here we set up a revisions subscription that will send the client only document revisions in which the order was shipped to Mexico.

Create subscription:

const subscriptionName = await documentStore.subscriptions.create({
    // Provide filtering logic
    // Only revisions that where shipped to Mexico will be sent to subscribed clients
    query: `declare function isSentToMexico(doc) { 
                return doc.Current.ShipTo.Country == 'Mexico'
            }

            from 'Orders' (Revisions = true) as doc
            where isSentToMexico(doc) == true`
});

Consume subscription:

const workerOptions = { subscriptionName };

const worker =
    documentStore.subscriptions.getSubscriptionWorkerForRevisions(workerOptions);

worker.on("batch", (batch, callback) => {
    try {
        for (const item of batch.items) {
            console.log(`
                This is a revision of document ${item.id}.
                The order in this revision was shipped at ${item.result.current.ShippedAt}.
            `);
        }
        callback();

    } catch (err) {
        callback(err);
    }
});

Projecting fields from revisions

Here we define a revisions subscription that will filter the revisions and send projected data to the client.

Create subscription:

const subscriptionName = await documentStore.subscriptions.create({
    // Filter revisions by the revenue delta.
    // The subscription will only process revisions where the revenue
    // is higher than in the preceding revision by 2500.
    
    query: `declare function isRevenueDeltaAboveThreshold(doc, threshold) { 
                return doc.Previous !== null && doc.Current.Lines.map(function(x) {
                    return x.PricePerUnit * x.Quantity;
                }).reduce((a, b) => a + b, 0) > doc.Previous.Lines.map(function(x) { 
                    return x.PricePerUnit * x.Quantity;
                }).reduce((a, b) => a + b, 0) + threshold
            }

            from 'Orders' (Revisions = true) as doc
            where isRevenueDeltaAboveThreshold(doc, 2500)

            // Define the projected fields that will be sent to the client:
            select {
                previousRevenue: doc.Previous.Lines.map(function(x) {
                    return x.PricePerUnit * x.Quantity;
                }).reduce((a, b) => a + b, 0),
      
                currentRevenue: doc.Current.Lines.map(function(x) {
                    return x.PricePerUnit * x.Quantity;
                }).reduce((a, b) => a + b, 0)
            }`
});
class OrderRevenues {
    constructor() {
        this.previousRevenue;
        this.currentRevenue;
    }
}

Consume subscription:

Since the revision fields are projected into the OrderRevenues class in the subscription definition,
each item received in the batch has the format of this projected class instead of the default result.previous and result.current fields, as was demonstrated in the simple example.

const workerOptions = { 
    subscriptionName: subscriptionName,
    documentType: OrderRevenues
};

const worker =
    // Note: in this case, where each resulting item in the batch is a projected object
    // and not the revision itself, we use method `getSubscriptionWorker`
    documentStore.subscriptions.getSubscriptionWorker(workerOptions);

worker.on("batch", (batch, callback) => {
    try {
        for (const item of batch.items) {
            // Access the projected content:
            console.log(`
                Revenue for order with ID: ${item.id}
                has grown from ${item.result.previousRevenue}
                to ${item.result.currentRevenue}
            `);
        }
        callback();

    } catch (err) {
        callback(err);
    }
});