Subscription Consumption Examples



Client with full exception handling and processing retries

Here we implement a client that handles exceptions thrown by a worker.
If the exception is recoverable, the client retries creating the worker.

while True:
    options = SubscriptionWorkerOptions(subscription_name)

    # here we configure that we allow a down time of up to 2 hours, and will wait for 2 minutes for reconnecting
    options.max_erroneous_period = timedelta(hours=2)
    options.time_to_wait_before_connection_retry = timedelta(minutes=2)

    subscription_worker = store.subscriptions.get_subscription_worker(options, Order)

    try:
        # here we are able to be informed of any exceptions that happens during processing
        subscription_worker.add_on_subscription_connection_retry(
            lambda exception: logger.error(
                f"Error during subscription processing: {subscription_name}", exc_info=exception
            )
        )

        def _process_documents_callback(batch: SubscriptionBatch[Order]):
            for item in batch.items:
                # we want to force close the subscription processing in that case
                # and let the external code decide what to do with that
                if item.result.company == "companies/2-A":
                    raise UnsupportedCompanyException(
                        "Company Id can't be 'companies/2-A', you must fix this"
                    )
                process_order(item.result)

            # Run will complete normally if you have disposed the subscription
            return

        # Pass the callback to worker.run()
        subscription_worker.run(_process_documents_callback)

    except Exception as e:
        logger.error(f"Failure in subscription: {subscription_name}", exc_info=e)
        exception_type = type(e)
        if (
            exception_type is DatabaseDoesNotExistException
            or exception_type is SubscriptionDoesNotExistException
            or exception_type is SubscriptionInvalidStateException
            or exception_type is AuthorizationException
        ):
            raise  # not recoverable

        if exception_type is SubscriptionClosedException:
            # closed explicitely by admin, probably
            return

        if exception_type is SubscriberErrorException:
            # for UnsupportedCompanyException type, we want to throw an exception, otherwise
            # we continue processing
            if e.args[1] is not None and type(e.args[1]) is UnsupportedCompanyException:
                raise

            continue

        # handle this depending on subscription
        # open strategy (discussed later)
        if e is SubscriptionInUseException:
            continue

        return
    finally:
        subscription_worker.close(False)

Worker with a specified batch size

Here we create a worker and specify the maximum number of documents the server will send to the worker in each batch.

worker_w_batch = store.subscriptions.get_subscription_worker(
    SubscriptionWorkerOptions(subscription_name, max_docs_per_batch=20), Order
)

_ = worker_w_batch.run(
    process_documents=lambda batch: ...
)  # Pass your method that takes SubscriptionBatch[_T] as an argument, with your logic in it

Worker that operates with a session

Here we create a subscription that sends Order documents that do not have a shipping date.
The worker receiving these documents will update the ShippedAt field value and save the document back to the server via the session.

subscription_name = store.subscriptions.create_for_options(
    SubscriptionCreationOptions(query="from Orders as o where o.ShippedAt = null")
)

subscription_worker = store.subscriptions.get_subscription_worker_by_name(subscription_name, Order)

def _transfer_order_callback(batch: SubscriptionBatch[Order]):
    with batch.open_session() as session:
        for order in (item.result for item in batch.items):
            transfer_order_to_shipment_company(order)
            order.shipped_at = datetime.utcnow()

        # we know that we have at least one order to ship,
        # because the subscription query above has that in it's WHERE clause
        session.save_changes()

_ = subscription_worker.run(_transfer_order_callback)

Worker that processes dynamic objects

Here we define a subscription that projects the Order documents into a dynamic format.
The worker processes the dynamic objects it receives.

subscription_name = "My dynamic subscription"
store.subscriptions.create_for_class(
    Order,
    SubscriptionCreationOptions(
        subscription_name,
        query="""
        From Orders as o
        Select 
        {
            dynamic_field_1: "Company: " + o.Company + " Employee: " + o.Employee,
        }
        """,
    ),
)

subscription_worker = store.subscriptions.get_subscription_worker_by_name(subscription_name)

def _raise_notification_callback(batch: SubscriptionBatch[Order]):
    for item in batch.items:
        raise_notification(item.result.dynamic_field_1)

_ = subscription_worker.run(_raise_notification_callback)

Subscription that ends when no documents are left

Here we create a subscription client that runs only up to the point there are no more new documents left to process.

This is useful for ad-hoc, single-use processing where the user needs to ensure that all documents are fully processed.

high_value_orders_worker = store.subscriptions.get_subscription_worker(
    SubscriptionWorkerOptions(
        subs_id,
        # Here we ask the worker to stop when there are no documents left to send.
        # Will throw SubscriptionClosedException when it finishes its job
        close_when_no_docs_left=True,
    ),
    OrderAndCompany,
)

try:

    def _subscription_batch_callback(batch: SubscriptionBatch[OrderAndCompany]):
        for item in batch.items:
            send_thank_you_note_to_employee(item.result)

    high_value_orders_worker.run(_subscription_batch_callback)
except SubscriptionClosedException:
    # that's expected
    ...

Subscription that uses included documents

Here we create a subscription that, in addition to sending all the Order documents to the worker,
will include all the referenced Product documents in the batch sent to the worker.

When the worker accesses these Product documents, no additional requests will be made to the server.

// Create the subscription task on the server:
// ===========================================

var subscriptionName = store.Subscriptions.Create(new SubscriptionCreationOptions()
{
    // Include the referenced Product documents for each Order document
    Query = @"from Orders include Lines[].Product"
});

// Create the subscription worker that will consume the documents:
// ===============================================================

var subscriptionWorker = store.Subscriptions.GetSubscriptionWorker<Order>(subscriptionName);
_ = subscriptionWorker.Run(batch =>
{
    // Open a session via 'batch.OpenSession'
    // in order to access the Product documents
    using (var session = batch.OpenSession())
    {
        foreach (var order in batch.Items.Select(x => x.Result))
        {
            foreach (var orderLine in order.Lines)
            {
                // Calling Load will Not generate a request to the server,
                // because orderLine.Product was included in the batch
                var product = session.Load<Product>(orderLine.Product);
                
                ProcessOrderAndProduct(order, product); // call your custom method
            }
        }
    }
});

Subscription workers with failover on other nodes

In this configuration, any available node will create a worker.
If the worker fails, another available node will take over.

worker = store.subscriptions.get_subscription_worker(
    SubscriptionWorkerOptions(subscription_name, strategy=SubscriptionOpeningStrategy.WAIT_FOR_FREE), Order
)

Primary and secondary workers

Here we create two workers:

  • The primary worker, with a TAKE_OVER strategy, will take over the other worker and establish the connection.
  • The secondary worker, with a WAIT_FOR_FREE strategy, will wait for the first worker to fail (due to machine failure, etc.).

The primary worker:

primary_worker = store.subscriptions.get_subscription_worker(SubscriptionWorkerOptions(subscription_name, strategy=SubscriptionOpeningStrategy.TAKE_OVER), Order)

while True:
    try:
        run_future = primary_worker.run(lambda batch: ...) # your logic
    except Exception:
        ... # retry

The secondary worker:

secondary_worker = store.subscriptions.get_subscription_worker(SubscriptionWorkerOptions(subscription_name), strategy=SubscriptionOpeningStrategy.WAIT_FOR_FREE)

while True:
    try:
        run_future = secondary_worker.run(lambda batch: ...) # your logic
    except Exception:
        ... # retry