The design of concurrent subscriptions in RavenDB
One of the interesting features with RavenDB is Subscriptions. These allow you to define a query and then to subscribe to its results. When a client opens the subscription, RavenDB will send it all the documents matching the subscription query. This is an ongoing process, so you will get updates from documents that were modified after your subscription started. For example, you can ask to get: “All orders in UK”, and then use that to compute tax / import rules.
Subscriptions are ideal for a number of use cases, but backend business processing is where they shine. This is because of the other property that subscriptions have, the ability to process the subscription results reliably. In other words, a failure in process a subscription batch will not lose anything, we can simply restart the subscription. In the same manner, a server failure will simply failover to another node and things will continue processing normally. You can shut down the client for an hour or a week and when the subscription is started, it will process all the matching documents that were changed while we didn’t listen.
Subscriptions currently have one very strong requirement. There can only ever be a single subscription client open at a given point. This is done to ensure that we can process the batches reliably. A subscription client will accept a batch, process it locally and then acknowledge it to the server, which will then send the next one.
Doing things in this manner ensures that there is an obvious path of progression in how the subscription operates. However, there are scenarios where you’ll want to use concurrent clients on a single subscription. For example, if you have a lengthy computation required, and you want to have concurrent workers to parallelize the work. That is not a scenario that is currently supported, and it turns out that there are significant challenges in supporting it. I want to use this post to walk through them and consider possible options.
The first issue that we have to take into account is that the fact that subscriptions are reliable is a hugely important feature, we don’t want to lose that. This means that if we allow multiple concurrent clients at the same time, we have to have a way to handle a client going down. Right now, RavenDB keeps track of a single number to do so. You can think about it as the last modified date that was sent to the subscription client, this isn’t how it works, but it is a close enough lie that would save us the deep details.
In other words, we send a batch of documents to the client and only update our record of the “last processed” when the batch is acknowledged. This design is simple and robust, but it cannot handle the situation when we have concurrent clients that are processing batches. We have to account for a client failing to process a batch and needing to resend it. This can be sent to the same client or to another one. That means that in addition the last “last processed” value, we also need to keep a record of in flight documents that were sent in batches and hasn’t been acknowledged yet.
We keep track of our clients by holding on to the TCP connection. That means that as long as the connection is open, the batch of documents that was sent will be considered in transit state. If the client that got the batch failed, we’ll have to note (when we close the TCP connection) and then send the old batch to another client. There are issues with that, by the way, different clients may have different batch sizes, for example. If the batch we need to retry has 100 documents, but the only available client needs 10 at a time, for example.
There is another problem with this approach, however. Consider the case of a document that was sent to a client for processing. While it is being processed, it is modified again, that means that we have a problem. Do we send the document again to another client for processing? Remember that it is very likely that you’ll do something related to this document, and it can be a cause for bugs because two clients will get the same document (albeit, two different versions of it) at the same time.
In order to support concurrent clients on the same subscription, we need to handle all of these problems.
- Keep track of all the documents that were sent and haven’t been acknowledged yet.
- Keep track of all the active connections and re-schedule the documents to be sent to clients that weren’t acknowledged if the connection is broken.
- When a document is about to be sent, we need to check that it isn’t already being processed (an early version of it, rather) by another client. If that is the case, we have to wait until that document is acknowledged before allowing that document to be processed.
The latter is meant to avoid concurrency issues with handling of a single document. I think that limiting the work on a document basis is a reasonable behavior. If your model requires coordination across multiple distinct documents, that is something that you’ll need to implement directly. Implementing the “don’t send the same document to multiple clients at the same time”, on the other hand, is likely to result in better experience all around.
This post is meant to explore the design of such a feature, and as such, I would dearly love any and all feedback.