Subscription Consumption Examples



Client with full exception handling and processing retries

Here we implement a client that handles exceptions thrown by the worker.
If the exception is recoverable, the client retries creating the worker.

while (true)
{
    // Create the worker:
    // ==================
    var options = new SubscriptionWorkerOptions(subscriptionName);

    // Configure the worker:
    // Allow a downtime of up to 2 hours,
    // and wait 2 minutes before reconnecting
    options.MaxErroneousPeriod = TimeSpan.FromHours(2);
    options.TimeToWaitBeforeConnectionRetry = TimeSpan.FromMinutes(2);

    subscriptionWorker = store.Subscriptions.GetSubscriptionWorker<Order>(options);

    try
    {
        // Subscribe to connection retry events
        // and log any exceptions that occur during processing
        subscriptionWorker.OnSubscriptionConnectionRetry += exception =>
        {
            Logger.Error("Error during subscription processing: " + subscriptionName,
                exception);
        };

        // Run the worker:
        // ===============
        await subscriptionWorker.Run(batch =>
        {
            foreach (var item in batch.Items)
            {
                // Forcefully stop subscription processing if the ID is "companies/2-A"
                // and throw an exception to let external logic handle the specific case
                if (item.Result.Company == "companies/2-A")
                {
                    // The custom exception thrown from here
                    // will be wrapped by `SubscriberErrorException`
                    throw new UnsupportedCompanyException(
                        "Company ID can't be 'companies/2-A', pleases fix");
                }

                // Process the order document - provide your own logic
                ProcessOrder(item.Result);
            }
        }, cancellationToken);

        // The Run method will stop if the subscription worker is disposed,
        // exiting the while loop
        return;
    }
    catch (Exception e)
    {
        Logger.Error("Failure in subscription: " + subscriptionName, e);

        // The following exceptions are Not recoverable
        if (e is DatabaseDoesNotExistException ||
            e is SubscriptionDoesNotExistException ||
            e is SubscriptionInvalidStateException ||
            e is AuthorizationException)
            throw;


        if (e is SubscriptionClosedException)
            // Subscription probably closed explicitly by admin
            return;

        if (e is SubscriberErrorException se)
        {
            // For UnsupportedCompanyException we want to throw an exception,
            // otherwise, continue processing
            if (se.InnerException != null && se.InnerException is UnsupportedCompanyException)
            {
                throw;
            }

            // Call continue to skip the current while(true) iteration and try reconnecting
            // in the next one, allowing the worker to process future batches.
            continue;
        }

        // Handle this depending on the subscription opening strategy
        if (e is SubscriptionInUseException)
            continue;

        // Call return to exit the while(true) loop,
        // dispose the worker (via finally), and stop the subscription.
        return;
    }
    finally
    {
        subscriptionWorker.Dispose();
    }
}

Worker with a specified batch size

Here we create a worker and specify the maximum number of documents the server will send to the worker in each batch.

var workerWBatch = store.Subscriptions.GetSubscriptionWorker<Order>(
    new SubscriptionWorkerOptions(subscriptionName)
    {
        MaxDocsPerBatch = 20
    });

_ = workerWBatch.Run(x =>
{
    // your custom logic 
});

Worker that operates with a session

Here we create a subscription that sends Order documents that do not have a shipping date.
The worker receiving these documents will update the ShippedAt field value and save the document back to the server via the session.

Note:
The session is opened with batch.OpenSession instead of with Store.OpenSession.

// Create the subscription task on the server:
// ===========================================

var subscriptionName = store.Subscriptions.Create(new SubscriptionCreationOptions()
{
    Query = @"from Orders as o where o.ShippedAt = null"
});

// Create the subscription worker that will consume the documents:
// ===============================================================

var subscriptionWorker = store.Subscriptions.GetSubscriptionWorker<Order>(subscriptionName);
_ = subscriptionWorker.Run(batch =>
{
    // Open a session with 'batch.OpenSession'
    using (var session = batch.OpenSession())
    {
        foreach (var order in batch.Items.Select(x => x.Result))
        {
            TransferOrderToShipmentCompany(order); // call your custom method 
            order.ShippedAt = DateTime.UtcNow;     // update the document field
        }

        // Save the updated Order documents
        session.SaveChanges();
    }
});

Worker that processes dynamic objects

Here we define a subscription that projects the Order documents into a dynamic format.
The worker processes the dynamic objects it receives.

// Create the subscription task on the server:
// ===========================================

var subscriptionName = "My dynamic subscription";
store.Subscriptions.Create(new SubscriptionCreationOptions<Order>()
{
    Name = subscriptionName,
    Projection = order => 
        new { DynanamicField_1 = "Company: " + order.Company + " Employee: " + order.Employee }
});

// Create the subscription worker that will consume the documents:
// ===============================================================

var subscriptionWorker = store.Subscriptions.GetSubscriptionWorker(subscriptionName);
_ = subscriptionWorker.Run(batch =>
{
    foreach (var item in batch.Items)
    {
        // Access the dynamic field in the document
        dynamic field = item.Result.DynanamicField_1;
        
        // Call your custom method
        ProcessItem(field); 
    }
});

Worker that processes a blittable object

Create a worker that processes documents as low level blittable objects.
This can be useful in extreme high-performance scenarios, but may be dangerous due to the direct usage of unmanaged memory.

// Create the subscription task on the server:
// ===========================================

var subscriptionName = store.Subscriptions.Create(new SubscriptionCreationOptions<Order>
{
    Projection = x => new
    {
        x.Employee
    }
});

// Create the subscription worker that will consume the documents:
// ===============================================================

var subscriptionWorker = 
    // Specify `BlittableJsonReaderObject` as the generic type parameter
    store.Subscriptions.GetSubscriptionWorker<BlittableJsonReaderObject>(subscriptionName);

_ = subscriptionWorker.Run(batch =>
{
    foreach (var item in batch.Items)
    {
        // Access the Employee field within the blittable object
        var employeeField = item.Result["Employee"].ToString();
        
        ProcessItem(employeeField); // call your custom method 
    }
});

Subscription that ends when no documents are left

Here we create a subscription client that runs until there are no more new documents to process.
This is useful for ad-hoc, single-use processing where the user needs to ensure that all documents are fully processed.

// Create the subscription task on the server:
// ===========================================
var subscriptionName = store.Subscriptions.Create<Order>(
    new SubscriptionCreationOptions<Order>
    {
        Filter = order => order.Lines.Sum(line => line.PricePerUnit * line.Quantity) > 10000,
        Projection = order => new OrderAndCompany
        {
            OrderId = order.Id,
            Company = RavenQuery.Load<Company>(order.Company)
        }
    });

// Create the subscription worker that will consume the documents:
// ===============================================================
var highValueOrdersWorker = store.Subscriptions.GetSubscriptionWorker<OrderAndCompany>(
    new SubscriptionWorkerOptions(subscriptionName)
    {
        // Here we set the worker to stop when there are no more documents left to send 
        // Will throw SubscriptionClosedException when it finishes it's job
        CloseWhenNoDocsLeft = true
    });

try
{
    await highValueOrdersWorker.Run(batch =>
    {
        foreach (var item in batch.Items)
        {
            SendThankYouNoteToEmployee(item.Result); // call your custom method 
        }
    });
}
catch (SubscriptionClosedException)
{
    // That's expected, no more documents to process
}

Subscription that uses included documents

Here we create a subscription that, in addition to sending all the Order documents to the worker,
will include all the referenced Product documents in the batch sent to the worker.

When the worker accesses these Product documents, no additional requests will be made to the server.

// Create the subscription task on the server:
// ===========================================

var subscriptionName = store.Subscriptions.Create(new SubscriptionCreationOptions()
{
    // Include the referenced Product documents for each Order document
    Query = @"from Orders include Lines[].Product"
});

// Create the subscription worker that will consume the documents:
// ===============================================================

var subscriptionWorker = store.Subscriptions.GetSubscriptionWorker<Order>(subscriptionName);
_ = subscriptionWorker.Run(batch =>
{
    // Open a session via 'batch.OpenSession'
    // in order to access the Product documents
    using (var session = batch.OpenSession())
    {
        foreach (var order in batch.Items.Select(x => x.Result))
        {
            foreach (var orderLine in order.Lines)
            {
                // Calling Load will Not generate a request to the server,
                // because orderLine.Product was included in the batch
                var product = session.Load<Product>(orderLine.Product);
                
                ProcessOrderAndProduct(order, product); // call your custom method
            }
        }
    }
});

Subscription workers with failover on other nodes

In this configuration, any available node will create a worker.
If the worker fails, another available node will take over.

var worker = store.Subscriptions.GetSubscriptionWorker<Order>(
    new SubscriptionWorkerOptions(subscriptionName)
{
    Strategy = SubscriptionOpeningStrategy.WaitForFree
});

Primary and secondary workers

Here we create two workers:

  • The primary worker, with a TakeOver strategy, will take over the other worker and establish the connection.
  • The secondary worker, with a WaitForFree strategy, will wait for the first worker to fail (due to machine failure, etc.).

The primary worker:

var primaryWorker = store.Subscriptions.GetSubscriptionWorker<Order>(
    new SubscriptionWorkerOptions(subscriptionName)
{
    Strategy = SubscriptionOpeningStrategy.TakeOver
});

while (true)
{
    try
    {
        await primaryWorker.Run(x =>
        {
            // your logic
        });
    }
    catch (Exception)
    {
        // retry
    }
}

The secondary worker:

var secondaryWorker = store.Subscriptions.GetSubscriptionWorker<Order>(
    new SubscriptionWorkerOptions(subscriptionName)
{
    Strategy = SubscriptionOpeningStrategy.WaitForFree
});

while (true)
{
    try
    {
        await secondaryWorker.Run(x =>
        {
            // your logic
        });
    }
    catch (Exception)
    {
        // retry
    }
}