Data Subscriptions: Subscription Consumption Examples



Subscription workers with failover on other nodes

In this configuration, any available node will create a worker.
If the worker fails, another available node will take over.

var worker = store.Subscriptions.GetSubscriptionWorker<Order>
        (new SubscriptionWorkerOptions(subscriptionName)
        {
            Strategy = SubscriptionOpeningStrategy.WaitForFree
        });

Worker with a specified batch size

Here we create a worker, specifying the maximum batch size we want to receive.

var workerWBatch = store.Subscriptions.GetSubscriptionWorker<Order>(
    new SubscriptionWorkerOptions(subscriptionName)
    {
        MaxDocsPerBatch = 20
    });
_ = workerWBatch.Run(x => { /* custom logic */ });

Client with full exception handling and processing retries

Here we implement a client that treats exceptions thrown by worker, and retries creating the worker if an exception is recoverable.

while (true)
{
    var options = new SubscriptionWorkerOptions(subscriptionName);

    // here we configure that we allow a down time of up to 2 hours, and will wait for 2 minutes for reconnecting
    options.MaxErroneousPeriod = TimeSpan.FromHours(2);
    options.TimeToWaitBeforeConnectionRetry = TimeSpan.FromMinutes(2);

    subscriptionWorker = store.Subscriptions.GetSubscriptionWorker<Order>(options);

    try
    {
        // here we are able to be informed of any exception that happens during processing                    
        subscriptionWorker.OnSubscriptionConnectionRetry += exception =>
        {
            Logger.Error("Error during subscription processing: " + subscriptionName, exception);
        };

        await subscriptionWorker.Run(async batch =>
        {
            foreach (var item in batch.Items)
            {
                // we want to force close the subscription processing in that case
                // and let the external code decide what to do with that
                if (item.Result.Company == "companies/832-A")
                    throw new UnsupportedCompanyException("Company Id can't be 'companies/832-A', you must fix this");
                await ProcessOrder(item.Result);
            }
        }, cancellationToken);

        // Run will complete normally if you have disposed the subscription
        return;
    }
    catch (Exception e)
    {
        Logger.Error("Failure in subscription: " + subscriptionName, e);

        if (e is DatabaseDoesNotExistException ||
            e is SubscriptionDoesNotExistException ||
            e is SubscriptionInvalidStateException ||
            e is AuthorizationException)
            throw; // not recoverable


        if (e is SubscriptionClosedException)
            // closed explicitly by admin, probably
            return;

        if (e is SubscriberErrorException se)
        {
            // for UnsupportedCompanyException type, we want to throw an exception, otherwise
            // we continue processing
            if (se.InnerException != null && se.InnerException is UnsupportedCompanyException)
            {
                throw;
            }

            continue;
        }

        // handle this depending on subscription
        // open strategy (discussed later)
        if (e is SubscriptionInUseException)
            continue;

        return;
    }
    finally
    {
        subscriptionWorker.Dispose();
    }
}

Subscription that ends when no documents left

Here we create a subscription client that runs only up to the point there are no more new documents left to process.

This is useful for an ad-hoc single use processing that the user wants to be sure is performed completely.

var highValueOrdersWorker = store.Subscriptions.GetSubscriptionWorker<OrderAndCompany>(
    new SubscriptionWorkerOptions(subsId)
    {
        // Here we ask the worker to stop when there are no documents left to send. 
        // Will throw SubscriptionClosedException when it finishes it's job
        CloseWhenNoDocsLeft = true
    });

try
{
    await highValueOrdersWorker.Run(async batch =>
    {
        foreach (var item in batch.Items)
        {
            await SendThankYouNoteToEmployee(item.Result);
        }
    });
}
catch (SubscriptionClosedException)
{
    // that's expected
}

Worker that processes dynamic objects

Here we create a worker that processes received data as dynamic objects.

var subscriptionName = "My dynamic subscription";
await store.Subscriptions.CreateAsync(new SubscriptionCreationOptions<Order>()
{                
    Name = "My dynamic subscription",
    Projection = order => new { DynanamicField_1 = "Company: " + order.Company + " Employee: " + order.Employee }
});

var subscriptionWorker = store.Subscriptions.GetSubscriptionWorker(subscriptionName);
_ = subscriptionWorker.Run(async batch =>
{
    foreach (var item in batch.Items)
    {
        await RaiseNotification(item.Result.DynanamicField_1);
    }
});

Subscription that works with a session

Here we create a worker that receives all orders without a shipping date, lets the shipment mechanism to handle it and updates the ShippedAt field value.

var subscriptionName = await store.Subscriptions.CreateAsync(new SubscriptionCreationOptions()
{
    Query = @"from Orders as o where o.ShippedAt = null"
});

var subscriptionWorker = store.Subscriptions.GetSubscriptionWorker<Order>(subscriptionName);
_ = subscriptionWorker.Run(async batch =>
{                
    using (var session = batch.OpenAsyncSession())
    {                    
        foreach (var order in batch.Items.Select(x => x.Result))
        {                        
            await TransferOrderToShipmentCompanyAsync(order);                        
            order.ShippedAt = DateTime.UtcNow;
                                    
        }

        // we know that we have at least one order to ship,
        // because the subscription query above has that in it's WHERE clause
        await session.SaveChangesAsync();
    }
});

Subscription that uses included documents

Here we create a subscription utilizing the includes feature, by processing Order documents and including all Products of each order.
When processing the subscription, we create a session using the SubscriptionBatch<T> object, and for each order line, we obtain the Product document and process it alongside with the Order.

var subscriptionName = await store.Subscriptions.CreateAsync(new SubscriptionCreationOptions()
{
    Query = @"from Orders include Lines[].Product"
});

var subscriptionWorker = store.Subscriptions.GetSubscriptionWorker<Order>(subscriptionName);
_ = subscriptionWorker.Run(async batch =>
{
    using (var session = batch.OpenAsyncSession())
    {
        foreach (var order in batch.Items.Select(x=>x.Result))
        {                        
            foreach(var orderLine in order.Lines)
            {
                // this line won't generate a request, because orderLine.Product was included
                var product = await session.LoadAsync<Product>(orderLine.Product);
                await RaiseNotification(order, product);
            }
            
        }
    }
});

Subscription that works with lowest level API

Here we create a subscription that works with blittable document representation that can be useful in very high performance scenarios, but it may be dangerous due to the direct usage of unmanaged memory.

await store.Subscriptions.CreateAsync(
    new SubscriptionCreationOptions<Order>
    {
        Projection = x => new
        {
            x.Employee
        }
    });

var subscriptionWorker = store.Subscriptions.GetSubscriptionWorker<BlittableJsonReaderObject>(subscriptionId);
_ = subscriptionWorker.Run(async batch =>
{
    foreach (var item in batch.Items)
    {
        await RaiseNotification(item.Result["Employee"].ToString());
    }
});

Subscription workers with a primary and a secondary node

Here we create two workers:

  • The primary worker, set with a TakeOver strategy, will take the lead over the secondary worker.
  • The secondary worker, set with a WaitForFree strategy, will take over if the primary worker fails (e.g. due to a machine failure).

The primary worker:

var primaryWorker = store.Subscriptions.GetSubscriptionWorker<Order>(new SubscriptionWorkerOptions(subscriptionName)
{
    Strategy = SubscriptionOpeningStrategy.TakeOver
});

while (true)
{
    try
    {
        await primaryWorker.Run(x =>
        {
            // your logic
        });
    }
    catch (Exception)
    {
        // retry
    }
}

The secondary worker:

var secondaryWorker = store.Subscriptions.GetSubscriptionWorker<Order>(new SubscriptionWorkerOptions(subscriptionName)
{
    Strategy = SubscriptionOpeningStrategy.WaitForFree
});

while (true)
{
    try
    {
        await secondaryWorker.Run(x =>
        {
            // your logic
        });
    }
    catch (Exception)
    {
        // retry
    }
}