Data Subscriptions: Revisions Support



Regular subscription vs Revisions subscription

Regular subscription

  • Processed items:
    The subscription processes documents from the defined collection.
    Only the latest version of the document is processed, even if the document has revisions.
  • Query access scope:
    The subscription query running on the server has access only to the latest/current version of the documents.
  • Data sent to client:
    Each item in the batch sent to the client contains a single document (or a projection of it),
    as defined in the subscription.

Revisions subscription

  • Processed items:
    The subscription processes all revisions of documents from the defined collection,
    including revisions of deleted documents from the revision bin if they have not been purged.
  • Query access scope:
    For each revision, the subscription query running on the server has access to both the currently processed revision and its previous revision.
  • Data sent to client:
    By default, unless the subscription query is projecting specific fields, each item in the batch sent to the client contains both the processed revision (Result.Current) and its preceding revision (Result.Previous). If the document has just been created, the previous revision will be null.

  • In order for the revisions subscription to work,
    Revisions must be configured and enabled for the collection the subscription manages.

  • A document that has no revisions will Not be processed, so make sure that your revisions configuration does not purge revisions before the subscription has a chance to process them.

Revisions processing order

In the revisions subscription, revisions are processed in pairs of subsequent entries.
For example, consider the following User document:

{
    Name: "James",
    Age: "21"
}

We update this User document in two consecutive operations:

  • Update the 'Age' field to the value of 22
  • Update the 'Age' field to the value of 23

The subscription worker in the client will receive pairs of revisions ( Previous & Current )
within each item in the batch in the following order:

Batch item Previous Current
item #1 null { Name: "James", Age: "21" }
item #2 { Name: "James", Age: "21" } { Name: "James", Age: "22" }
item #3 { Name: "James", Age: "22" } { Name: "James", Age: "23" }

Simple creation and consumption

Here we set up a basic revisions subscription that will deliver pairs of consecutive Order document revisions to the client:

Create subscription:

subscriptionName = store.Subscriptions.Create(
    // Use <Revision<documentClassType>> as the type for the processed items
    // e.g. <Revision<Order>>
    new SubscriptionCreationOptions<Revision<Order>>());
subscriptionName = store.Subscriptions.Create(new SubscriptionCreationOptions()
{
    // Add (Revisions = true) to your subscription RQL
    Query = @"From Orders (Revisions = true)"
});

Consume subscription:

SubscriptionWorker<Revision<Order>> revisionsWorker = 
    // Specify <Revision<Order>> as the type of the processed items
    store.Subscriptions.GetSubscriptionWorker<Revision<Order>>(subscriptionName);

await revisionsWorker.Run((SubscriptionBatch<Revision<Order>> batch) =>
{
    foreach (var item in batch.Items)
    {
        // Access the previous revision via 'Result.Previous'
        var previousRevision = item.Result.Previous;

        // Access the current revision via 'Result.Current'
        var currentRevision = item.Result.Current;

        // Provide your own processing logic:
        ProcessOrderRevisions(previousRevision, currentRevision);
    }
});

Filtering revisions

Here we set up a revisions subscription that will send the client only document revisions in which the order was shipped to Mexico.

Create subscription:

subscriptionName = store.Subscriptions.Create(
    // Specify <Revision<Order>> as the type of the processed items
    new SubscriptionCreationOptions<Revision<Order>>()
    {
        // Provide filtering logic
        // Only revisions that where shipped to Mexico will be sent to subscribed clients
        Filter = revision => revision.Current.ShipTo.Country == "Mexico",
    });
subscriptionName = await store.Subscriptions.CreateAsync(new SubscriptionCreationOptions()
{
    Query = @"declare function isSentToMexico(doc) { 
                  return doc.Current.ShipTo.Country == 'Mexico'
              }

              from 'Orders' (Revisions = true) as doc
              where isSentToMexico(doc) == true"
});

Consume subscription:

SubscriptionWorker<Revision<Order>> worker =
    store.Subscriptions.GetSubscriptionWorker<Revision<Order>>(subscriptionName);

await worker.Run(batch =>
{
    foreach (var item in batch.Items)
    {
        Console.WriteLine($@"
            This is a revision of document {item.Id}.
            The order in this revision was shipped at {item.Result.Current.ShippedAt}.");
    }
});

Projecting fields from revisions

Here we define a revisions subscription that will filter the revisions and send projected data to the client.

Create subscription:

subscriptionName = store.Subscriptions.Create(
    // Specify <Revision<Order>> as the type of the processed items within the query
    new SubscriptionCreationOptions<Revision<Order>>()
    {
        // Filter revisions by the revenue delta.
        // The subscription will only process revisions where the revenue
        // is higher than in the preceding revision by 2500.
        Filter = revision =>
            revision.Previous != null &&
            revision.Current.Lines.Sum(x => x.PricePerUnit * x.Quantity) > 
            revision.Previous.Lines.Sum(x => x.PricePerUnit * x.Quantity) + 2500,
        
        // Define the projected fields that will be sent to the client
        Projection = revision => new OrderRevenues()
        {
            PreviousRevenue = 
                revision.Previous.Lines.Sum(x => x.PricePerUnit * x.Quantity),
            
            CurrentRevenue = 
                revision.Current.Lines.Sum(x => x.PricePerUnit * x.Quantity)
        }
    });
subscriptionName = store.Subscriptions.Create(new SubscriptionCreationOptions()
{
    Query = @"declare function isRevenueDeltaAboveThreshold(doc, threshold) { 
                  return doc.Previous !== null && doc.Current.Lines.map(function(x) {
                      return x.PricePerUnit * x.Quantity;
                  }).reduce((a, b) => a + b, 0) > doc.Previous.Lines.map(function(x) { 
                      return x.PricePerUnit * x.Quantity;
                  }).reduce((a, b) => a + b, 0) + threshold
              }

              from 'Orders' (Revisions = true) as doc
              where isRevenueDeltaAboveThreshold(doc, 2500)

              select {
                  PreviousRevenue: doc.Previous.Lines.map(function(x) {
                      return x.PricePerUnit * x.Quantity;
                  }).reduce((a, b) => a + b, 0),

                  CurrentRevenue: doc.Current.Lines.map(function(x) {
                      return x.PricePerUnit * x.Quantity;
                  }).reduce((a, b) => a + b, 0)
              }"
});
public class OrderRevenues
{
    public decimal PreviousRevenue { get; set; }
    public decimal CurrentRevenue { get; set; }
}

Consume subscription:

Since the revision fields are projected into the OrderRevenues class in the subscription definition,
each item received in the batch has the format of this projected class instead of the default Result.Previous and Result.Current fields, as was demonstrated in the simple example.

SubscriptionWorker<OrderRevenues> revenuesComparisonWorker =
    // Use the projected class type 'OrderRevenues' for the items the worker will process
    store.Subscriptions.GetSubscriptionWorker<OrderRevenues>(subscriptionName);

await revenuesComparisonWorker.Run(batch =>
{
    foreach (var item in batch.Items)
    {
        // Access the projected content:
        Console.WriteLine($@"Revenue for order with ID: {item.Id}
                             has grown from {item.Result.PreviousRevenue}
                             to {item.Result.CurrentRevenue}");
    }
});