Compare Exchange Overview
-
The Compare Exchange feature allows you to perform cluster-wide interlocked distributed operations.
-
Unique Keys can be reserved in the Database Group accross the cluster.
Each key has an associated Value. -
Modifying these values is an interlocked compare exchange operation.
-
Once defined, the Compare Exchange Values can be accessed via GetCompareExchangeValuesOperation,
or by using RQL in a query (see example-I below) -
In this page:
Compare Exchange Transaction Scope
-
Since the compare-exchange operations guarantee atomicity across the entire cluster, the feature is not using the transaction associated with a session object, as a session transaction spans only a single node.
-
So if a compare-exchange operation has failed when used inside a session block, it will not be rolled back automatically upon a session transaction failure.
Creating a Key
- Provide the following when saving a key:
Parameter | Description |
---|---|
key | A string under which Value is saved, unique in the database scope across the cluster. |
value | The Value that is associated with the Key. Can be a number, string, boolean, array or any JSON formatted object. |
index | The Index number is indicating the version of Value. The Index is used for the concurrency control, similar to documents Etags. |
-
When creating a new 'Compare Exchange Key', the index should be set to
0
. -
The Put operation will succeed only if this key doesn't exist yet.
-
Note: Two different keys can have the same values as long as the keys are unique.
Updating a Key
-
Updating a 'Compare Exchange' key can be divided into 2 phases:
-
Get the existing Key. The associated Value and Index are received.
-
The Index obtained from the read operation is provided to the Put operation along with the new Value to be saved.
This save will succeed only if the index that is provided to the 'Put' operation is the same as the index that was received from the server in the previous 'Get', which means that the Value was not modified by someone else between the read and write operations.
-
Example I - Email Address Reservation
-
Compare Exchange can be used to maintain uniqueness across users emails accounts.
-
First try to reserve a new user email.
If the email is successfully reserved then save the user account document.
String email = "user@example.com";
User user = new User();
user.setEmail(email);
try (IDocumentSession session = store.openSession()) {
session.store(user);
// At this point, the user document has an Id assigned
// Try to reserve a new user email
// Note: This operation takes place outside of the session transaction,
// It is a cluster-wide reservation
CompareExchangeResult<String> cmpXchgResult = store
.operations().send(
new PutCompareExchangeValueOperation<>(
"emails/" + email, user.getId(), 0));
if (!cmpXchgResult.isSuccessful()) {
throw new RuntimeException("Email is already in use");
}
// At this point we managed to reserve/save the user email -
// The document can be saved in SaveChanges
session.saveChanges();
}
Implications:
-
The
User
object is saved as a document, hence it can be indexed, queried, etc. -
If
session.saveChanges
fails, the email reservation is not rolled back automatically. It is your responsibility to do so. -
The compare exchange value that was saved can be accessed from
RQL
in a query:
try (IDocumentSession session = store.openSession()) {
List<User> query = session.advanced().rawQuery(User.class,
"from Users as s where id() == cmpxchg(\"emails/ayende@ayende.com\")")
.toList();
IDocumentQuery<User> q = session.advanced()
.documentQuery(User.class)
.whereEquals("id", CmpXchg.value("emails/ayende@ayende.com"));
}
from Users as s where id() == cmpxchg("emails/ayende@ayende.com")
Example II - Reserve a Shared Resource
-
Use compare exchange for a shared resource reservation.
-
The code also checks for clients who never release resources (i.e. due to failure) by using timeout.
private class SharedResource {
private LocalDateTime reservedUntil;
public LocalDateTime getReservedUntil() {
return reservedUntil;
}
public void setReservedUntil(LocalDateTime reservedUntil) {
this.reservedUntil = reservedUntil;
}
}
public void printWork() throws InterruptedException {
// Try to get hold of the printer resource
long reservationIndex = lockResource(store, "Printer/First-Floor", Duration.ofMinutes(20));
try {
// Do some work for the duration that was set.
// Don't exceed the duration, otherwise resource is available for someone else.
} finally {
releaseResource(store, "Printer/First-Floor", reservationIndex);
}
}
public long lockResource(IDocumentStore store, String resourceName, Duration duration) throws InterruptedException {
while (true) {
LocalDateTime now = LocalDateTime.now();
SharedResource resource = new SharedResource();
resource.setReservedUntil(now.plus(duration));
CompareExchangeResult<SharedResource> saveResult =
store.operations().send(
new PutCompareExchangeValueOperation<SharedResource>(resourceName, resource, 0));
if (saveResult.isSuccessful()) {
// resourceName wasn't present - we managed to reserve
return saveResult.getIndex();
}
// At this point, Put operation failed - someone else owns the lock or lock time expired
if (saveResult.getValue().reservedUntil.isBefore(now)) {
// Time expired - Update the existing key with the new value
CompareExchangeResult<SharedResource> takeLockWithTimeoutResult =
store.operations().send(
new PutCompareExchangeValueOperation<>(resourceName, resource, saveResult.getIndex()));
if (takeLockWithTimeoutResult.isSuccessful()) {
return takeLockWithTimeoutResult.getIndex();
}
}
// Wait a little bit and retry
Thread.sleep(20);
}
}
public void releaseResource(IDocumentStore store, String resourceName, long index) {
CompareExchangeResult<SharedResource> deleteResult = store
.operations().send(
new DeleteCompareExchangeValueOperation<>(SharedResource.class, resourceName, index));
// We have 2 options here:
// deleteResult.Successful is true - we managed to release resource
// deleteResult.Successful is false - someone else took the lock due to timeout
}