Data Subscriptions: Subscription Consumption Examples



Subscription workers with failover on other nodes

In this configuration, any available node will create a worker.
If the worker fails, another available node will take over.

var worker = store.Subscriptions.GetSubscriptionWorker<Order>
        (new SubscriptionWorkerOptions(subscriptionName)
{
    Strategy = SubscriptionOpeningStrategy.WaitForFree
});

Worker with a specified batch size

Here we create a worker, specifying the maximum batch size we want to receive.

var workerWBatch = store.Subscriptions.GetSubscriptionWorker<Order>(
    new SubscriptionWorkerOptions(subscriptionName)
    {
        MaxDocsPerBatch = 20
    });
_ = workerWBatch.Run(x => { /* custom logic */ });

Client with full exception handling and processing retries

Here we implement a client that treats exceptions thrown by a worker, and retries creating the worker if an exception is recoverable.

while (true)
{
    var options = new SubscriptionWorkerOptions(subscriptionName);

    // here we configure that we allow a down time of up to 2 hours, and will wait for 2 minutes for reconnecting
    options.MaxErroneousPeriod = TimeSpan.FromHours(2);
    options.TimeToWaitBeforeConnectionRetry = TimeSpan.FromMinutes(2);

    subscriptionWorker = store.Subscriptions.GetSubscriptionWorker<Order>(options);

    try
    {
        // here we are able to be informed of any exception that happens during processing                    
        subscriptionWorker.OnSubscriptionConnectionRetry += exception =>
        {
            Logger.Error("Error during subscription processing: " + subscriptionName, exception);
        };

        await subscriptionWorker.Run(async batch =>
        {
            foreach (var item in batch.Items)
            {
                // we want to force close the subscription processing in that case
                // and let the external code decide what to do with that
                if (item.Result.Company == "companies/832-A")
                    throw new UnsupportedCompanyException("Company Id can't be 'companies/832-A', you must fix this");
                await ProcessOrder(item.Result);
            }
        }, cancellationToken);

        // Run will complete normally if you have disposed the subscription
        return;
    }
    catch (Exception e)
    {
        Logger.Error("Failure in subscription: " + subscriptionName, e);

        if (e is DatabaseDoesNotExistException ||
            e is SubscriptionDoesNotExistException ||
            e is SubscriptionInvalidStateException ||
            e is AuthorizationException)
            throw; // not recoverable


        if (e is SubscriptionClosedException)
            // closed explicitly by admin, probably
            return;

        if (e is SubscriberErrorException se)
        {
            // for UnsupportedCompanyException type, we want to throw an exception, otherwise
            // we continue processing
            if (se.InnerException != null && se.InnerException is UnsupportedCompanyException)
            {
                throw;
            }

            continue;
        }

        // handle this depending on subscription
        // open strategy (discussed later)
        if (e is SubscriptionInUseException)
            continue;

        return;
    }
    finally
    {
        subscriptionWorker.Dispose();
    }
}

Subscription that ends when no documents are left

Here we create a subscription client that runs only up to the point there are no more new documents left to process.

This is useful for an ad-hoc single-use processing that the user wants to be sure is performed completely.

var highValueOrdersWorker = store.Subscriptions.GetSubscriptionWorker<OrderAndCompany>(
    new SubscriptionWorkerOptions(subsId)
    {
        // Here we ask the worker to stop when there are no documents left to send. 
        // Will throw SubscriptionClosedException when it finishes it's job
        CloseWhenNoDocsLeft = true
    });

try
{
    await highValueOrdersWorker.Run(async batch =>
    {
        foreach (var item in batch.Items)
        {
            await SendThankYouNoteToEmployee(item.Result);
        }
    });
}
catch (SubscriptionClosedException)
{
    // that's expected
}

Worker that processes dynamic objects

Here we create a worker that processes received data as dynamic objects.

var subscriptionName = "My dynamic subscription";
await store.Subscriptions.CreateAsync(new SubscriptionCreationOptions<Order>()
{
    Name = "My dynamic subscription",
    Projection = order => new { DynanamicField_1 = "Company: " + order.Company + " Employee: " + order.Employee }
});

var subscriptionWorker = store.Subscriptions.GetSubscriptionWorker(subscriptionName);
_ = subscriptionWorker.Run(async batch =>
{
    foreach (var item in batch.Items)
    {
        await RaiseNotification(item.Result.DynanamicField_1);
    }
});

Subscription that works with a session

Here we create a worker that receives all orders without a shipping date, lets the shipment mechanism handle it, and updates the ShippedAt field value.

var subscriptionName = await store.Subscriptions.CreateAsync(new SubscriptionCreationOptions()
{
    Query = @"from Orders as o where o.ShippedAt = null"
});

var subscriptionWorker = store.Subscriptions.GetSubscriptionWorker<Order>(subscriptionName);
_ = subscriptionWorker.Run(async batch =>
{
    using (var session = batch.OpenAsyncSession())
    {
        foreach (var order in batch.Items.Select(x => x.Result))
        {
            await TransferOrderToShipmentCompanyAsync(order);
            order.ShippedAt = DateTime.UtcNow;

        }

        // we know that we have at least one order to ship,
        // because the subscription query above has that in it's WHERE clause
        await session.SaveChangesAsync();
    }
});

Subscription that uses included documents

Here we create a subscription utilizing the includes feature, by processing Order documents and including all Products of each order.
When processing the subscription, we create a session using the SubscriptionBatch<T> object, and for each order line, we obtain the Product document and process it alongside with the Order.

var subscriptionName = await store.Subscriptions.CreateAsync(new SubscriptionCreationOptions()
{
    Query = @"from Orders include Lines[].Product"
});

var subscriptionWorker = store.Subscriptions.GetSubscriptionWorker<Order>(subscriptionName);
_ = subscriptionWorker.Run(async batch =>
{
    using (var session = batch.OpenAsyncSession())
    {
        foreach (var order in batch.Items.Select(x => x.Result))
        {
            foreach (var orderLine in order.Lines)
            {
                // this line won't generate a request, because orderLine.Product was included
                var product = await session.LoadAsync<Product>(orderLine.Product);
                await RaiseNotification(order, product);
            }

        }
    }
});

Subscription that works with lowest level API

Here we create a subscription that works with blittable document representation that can be useful in extreme high-performance scenarios, but it may be dangerous due to the direct usage of unmanaged memory.

await store.Subscriptions.CreateAsync(
    new SubscriptionCreationOptions<Order>
    {
        Projection = x => new
        {
            x.Employee
        }
    });

var subscriptionWorker = store.Subscriptions.GetSubscriptionWorker<BlittableJsonReaderObject>(subscriptionId);
_ = subscriptionWorker.Run(async batch =>
{
    foreach (var item in batch.Items)
    {
        await RaiseNotification(item.Result["Employee"].ToString());
    }
});

Subscription workers with a primary and a secondary node

Here we create two workers:

  • The primary worker, set with a TakeOver strategy, will take the lead over the secondary worker.
  • The secondary worker, set with a WaitForFree strategy, will take over if the primary worker fails (e.g. due to a machine failure).

The primary worker:

var primaryWorker = store.Subscriptions.GetSubscriptionWorker<Order>(new SubscriptionWorkerOptions(subscriptionName)
{
    Strategy = SubscriptionOpeningStrategy.TakeOver
});

while (true)
{
    try
    {
        await primaryWorker.Run(x =>
        {
            // your logic
        });
    }
    catch (Exception)
    {
        // retry
    }
}

The secondary worker:

var secondaryWorker = store.Subscriptions.GetSubscriptionWorker<Order>(new SubscriptionWorkerOptions(subscriptionName)
{
    Strategy = SubscriptionOpeningStrategy.WaitForFree
});

while (true)
{
    try
    {
        await secondaryWorker.Run(x =>
        {
            // your logic
        });
    }
    catch (Exception)
    {
        // retry
    }
}