Data Subscriptions: Revisions Support
-
When the Revisions feature is enabled, a document revision is created with each change made to the document.
Each revision contains a snapshot of the document at the time of modification, forming a complete audit trail. -
The Data Subscription feature supports subscribing not only to documents but also to their revisions.
This functionality allows the subscribed client to track changes made to documents over time. -
The revisions support is specified within the subscription definition.
See how to create and consume it in the examples below. -
In this page:
Regular subscription vs Revisions subscription
Regular subscription
- Processed items:
The subscription processes documents from the defined collection.
Only the latest version of the document is processed, even if the document has revisions. - Query access scope:
The subscription query running on the server has access only to the latest/current version of the documents. - Data sent to client:
Each item in the batch sent to the client contains a single document (or a projection of it),
as defined in the subscription.
Revisions subscription
- Processed items:
The subscription processes all revisions of documents from the defined collection,
including revisions of deleted documents from the revision bin if they have not been purged. - Query access scope:
For each revision, the subscription query running on the server has access to both the currently processed revision and its previous revision. - Data sent to client:
By default, unless the subscription query is projecting specific fields, each item in the batch sent to the client contains both the processed revision (Result.Current
) and its preceding revision (Result.Previous
). If the document has just been created, the previous revision will benull
.
-
In order for the revisions subscription to work,
Revisions must be configured and enabled for the collection the subscription manages. -
A document that has no revisions will Not be processed, so make sure that your revisions configuration does not purge revisions before the subscription has a chance to process them.
Revisions processing order
In the revisions subscription, revisions are processed in pairs of subsequent entries.
For example, consider the following User document:
{
Name: "James",
Age: "21"
}
We update this User document in two consecutive operations:
- Update the 'Age' field to the value of 22
- Update the 'Age' field to the value of 23
The subscription worker in the client will receive pairs of revisions ( Previous & Current )
within each item in the batch in the following order:
Batch item | Previous | Current |
---|---|---|
item #1 | null |
{ Name: "James", Age: "21" } |
item #2 | { Name: "James", Age: "21" } |
{ Name: "James", Age: "22" } |
item #3 | { Name: "James", Age: "22" } |
{ Name: "James", Age: "23" } |
Simple creation and consumption
Here we set up a basic revisions subscription that will deliver pairs of consecutive Order document revisions to the client:
Create subscription:
subscriptionName = store.Subscriptions.Create(
// Use <Revision<documentClassType>> as the type for the processed items
// e.g. <Revision<Order>>
new SubscriptionCreationOptions<Revision<Order>>());
subscriptionName = store.Subscriptions.Create(new SubscriptionCreationOptions()
{
// Add (Revisions = true) to your subscription RQL
Query = @"From Orders (Revisions = true)"
});
Consume subscription:
SubscriptionWorker<Revision<Order>> revisionsWorker =
// Specify <Revision<Order>> as the type of the processed items
store.Subscriptions.GetSubscriptionWorker<Revision<Order>>(subscriptionName);
await revisionsWorker.Run((SubscriptionBatch<Revision<Order>> batch) =>
{
foreach (var item in batch.Items)
{
// Access the previous revision via 'Result.Previous'
var previousRevision = item.Result.Previous;
// Access the current revision via 'Result.Current'
var currentRevision = item.Result.Current;
// Provide your own processing logic:
ProcessOrderRevisions(previousRevision, currentRevision);
}
});
Filtering revisions
Here we set up a revisions subscription that will send the client only document revisions in which the order was shipped to Mexico.
Create subscription:
subscriptionName = store.Subscriptions.Create(
// Specify <Revision<Order>> as the type of the processed items
new SubscriptionCreationOptions<Revision<Order>>()
{
// Provide filtering logic
// Only revisions that where shipped to Mexico will be sent to subscribed clients
Filter = revision => revision.Current.ShipTo.Country == "Mexico",
});
subscriptionName = await store.Subscriptions.CreateAsync(new SubscriptionCreationOptions()
{
Query = @"declare function isSentToMexico(doc) {
return doc.Current.ShipTo.Country == 'Mexico'
}
from 'Orders' (Revisions = true) as doc
where isSentToMexico(doc) == true"
});
Consume subscription:
SubscriptionWorker<Revision<Order>> worker =
store.Subscriptions.GetSubscriptionWorker<Revision<Order>>(subscriptionName);
await worker.Run(batch =>
{
foreach (var item in batch.Items)
{
Console.WriteLine($@"
This is a revision of document {item.Id}.
The order in this revision was shipped at {item.Result.Current.ShippedAt}.");
}
});
Projecting fields from revisions
Here we define a revisions subscription that will filter the revisions and send projected data to the client.
Create subscription:
subscriptionName = store.Subscriptions.Create(
// Specify <Revision<Order>> as the type of the processed items within the query
new SubscriptionCreationOptions<Revision<Order>>()
{
// Filter revisions by the revenue delta.
// The subscription will only process revisions where the revenue
// is higher than in the preceding revision by 2500.
Filter = revision =>
revision.Previous != null &&
revision.Current.Lines.Sum(x => x.PricePerUnit * x.Quantity) >
revision.Previous.Lines.Sum(x => x.PricePerUnit * x.Quantity) + 2500,
// Define the projected fields that will be sent to the client
Projection = revision => new OrderRevenues()
{
PreviousRevenue =
revision.Previous.Lines.Sum(x => x.PricePerUnit * x.Quantity),
CurrentRevenue =
revision.Current.Lines.Sum(x => x.PricePerUnit * x.Quantity)
}
});
subscriptionName = store.Subscriptions.Create(new SubscriptionCreationOptions()
{
Query = @"declare function isRevenueDeltaAboveThreshold(doc, threshold) {
return doc.Previous !== null && doc.Current.Lines.map(function(x) {
return x.PricePerUnit * x.Quantity;
}).reduce((a, b) => a + b, 0) > doc.Previous.Lines.map(function(x) {
return x.PricePerUnit * x.Quantity;
}).reduce((a, b) => a + b, 0) + threshold
}
from 'Orders' (Revisions = true) as doc
where isRevenueDeltaAboveThreshold(doc, 2500)
select {
PreviousRevenue: doc.Previous.Lines.map(function(x) {
return x.PricePerUnit * x.Quantity;
}).reduce((a, b) => a + b, 0),
CurrentRevenue: doc.Current.Lines.map(function(x) {
return x.PricePerUnit * x.Quantity;
}).reduce((a, b) => a + b, 0)
}"
});
public class OrderRevenues
{
public decimal PreviousRevenue { get; set; }
public decimal CurrentRevenue { get; set; }
}
Consume subscription:
Since the revision fields are projected into the OrderRevenues
class in the subscription definition,
each item received in the batch has the format of this projected class instead of the default Result.Previous
and Result.Current
fields,
as was demonstrated in the simple example.
SubscriptionWorker<OrderRevenues> revenuesComparisonWorker =
// Use the projected class type 'OrderRevenues' for the items the worker will process
store.Subscriptions.GetSubscriptionWorker<OrderRevenues>(subscriptionName);
await revenuesComparisonWorker.Run(batch =>
{
foreach (var item in batch.Items)
{
// Access the projected content:
Console.WriteLine($@"Revenue for order with ID: {item.Id}
has grown from {item.Result.PreviousRevenue}
to {item.Result.CurrentRevenue}");
}
});