Data Subscriptions: Revisions Support


Data subscription supports subscribing not only on documents, but also on all their revisions.
Revision support should be defined in the subscription. It also requires revisions to be configured on the collection in question.
While regular subscriptions process single documents, subscription on documents revisions processes pairs of subsequent document revisions.
Such functionality allows keeping track of each change that was performed on a document, and even to compare two subsequent versions of a document.
Both document revisions are accessible in the filtering and the projection process.


Revisions processing order

Documents revisions feature allows tracking changes that were performed on a document, by storing the audit trail of its changes over time.
An audit trail entry is called a Document Revision and is comprised of a document snapshot.

In data subscription, Documents Revisions will be processed in pairs of subsequent entries.
Example: Let us assume a user document that looks like:

{ Name:'James', Age:'21' }

  • We update the User document twice, in separate operations:
    • We update the 'Age' field to the value of 22
    • We update the 'Age' field to the value of 23

Data subscription's revision processing mechanism will receive pairs of revision in the following order:

# Previous Current
1 null { Name:'James', Age:'21' }
2 { Name:'James', Age:'21' } { Name:'James', Age:'22' }
3 { Name:'James', Age:'22' } { Name:'James', Age:'23' }

Warning

As seen above, in order for subscriptions on revisions to work properly, it needs the revisions entries to be available, otherwise, there will be no data to process. Therfore, it's crucial to make sure that the revisions configuration allows storing documents revisions enough time, without discarding unprocessed revisions

Simple declaration and usage

Here we declare a simple revisions subscription that will send pairs of subsequent document revisions to the client:

Creation:

name = store.subscriptions().createForRevisions(Order.class);
SubscriptionCreationOptions options = new SubscriptionCreationOptions();
options.setQuery("from orders (Revisions = true)");
name = store.subscriptions().createForRevisions(Order.class, options);

Consumption:

SubscriptionWorker<Revision<Order>> revisionWorker = store
    .subscriptions().getSubscriptionWorkerForRevisions(Order.class, name);
revisionWorker.run(x -> {
    for (SubscriptionBatch.Item<Revision<Order>> documentsPair : x.getItems()) {

        Order prev = documentsPair.getResult().getPrevious();
        Order current = documentsPair.getResult().getCurrent();

        processOrderChanges(prev, current);
    }
});

Revisions processing and projection

Here we declare a revisions subscription that will filter and project data from revisions pairs:

Creation:

SubscriptionCreationOptions options = new SubscriptionCreationOptions();
options.setQuery("declare function getOrderLinesSum(doc) {" +
    "    var sum = 0;" +
    "    for (var i in doc.Lines) { sum += doc.Lines[i]; } " +
    "    return sum;" +
    "}" +
    "" +
    "  from orders (Revisions = true) " +
    " where getOrderLinesSum(this.Current) > getOrderLinesSum(this.Previous) " +
    " select {" +
    "  previousRevenue: getOrderLinesSum(this.Previous)," +
    "  currentRevenue: getOrderLinesSum(this.Current)" +
    "}");

name = store.subscriptions().create(options);

Consumption:

SubscriptionWorker<Revision<Order>> revisionWorker = store
    .subscriptions().getSubscriptionWorkerForRevisions(Order.class, name);
revisionWorker.run(x -> {
    for (SubscriptionBatch.Item<Revision<Order>> documentsPair : x.getItems()) {

        Order prev = documentsPair.getResult().getPrevious();
        Order current = documentsPair.getResult().getCurrent();

        processOrderChanges(prev, current);
    }
});