Subscription Consumption Examples
-
In this page:
- Client with full exception handling and processing retries
- Worker with a specified batch size
- Worker that operates with a session
- Worker that processes dynamic objects
- Subscription that ends when no documents are left
- Subscription that uses included documents
- Subscription workers with failover on other nodes
- Primary and secondary workers
Client with full exception handling and processing retries
Here we implement a client that handles exceptions thrown by a worker.
If the exception is recoverable, the client retries creating the worker.
while True:
options = SubscriptionWorkerOptions(subscription_name)
# here we configure that we allow a down time of up to 2 hours, and will wait for 2 minutes for reconnecting
options.max_erroneous_period = timedelta(hours=2)
options.time_to_wait_before_connection_retry = timedelta(minutes=2)
subscription_worker = store.subscriptions.get_subscription_worker(options, Order)
try:
# here we are able to be informed of any exceptions that happens during processing
subscription_worker.add_on_subscription_connection_retry(
lambda exception: logger.error(
f"Error during subscription processing: {subscription_name}", exc_info=exception
)
)
def _process_documents_callback(batch: SubscriptionBatch[Order]):
for item in batch.items:
# we want to force close the subscription processing in that case
# and let the external code decide what to do with that
if item.result.company == "companies/2-A":
raise UnsupportedCompanyException(
"Company Id can't be 'companies/2-A', you must fix this"
)
process_order(item.result)
# Run will complete normally if you have disposed the subscription
return
# Pass the callback to worker.run()
subscription_worker.run(_process_documents_callback)
except Exception as e:
logger.error(f"Failure in subscription: {subscription_name}", exc_info=e)
exception_type = type(e)
if (
exception_type is DatabaseDoesNotExistException
or exception_type is SubscriptionDoesNotExistException
or exception_type is SubscriptionInvalidStateException
or exception_type is AuthorizationException
):
raise # not recoverable
if exception_type is SubscriptionClosedException:
# closed explicitely by admin, probably
return
if exception_type is SubscriberErrorException:
# for UnsupportedCompanyException type, we want to throw an exception, otherwise
# we continue processing
if e.args[1] is not None and type(e.args[1]) is UnsupportedCompanyException:
raise
continue
# handle this depending on subscription
# open strategy (discussed later)
if e is SubscriptionInUseException:
continue
return
finally:
subscription_worker.close(False)
Worker with a specified batch size
Here we create a worker and specify the maximum number of documents the server will send to the worker in each batch.
worker_w_batch = store.subscriptions.get_subscription_worker(
SubscriptionWorkerOptions(subscription_name, max_docs_per_batch=20), Order
)
_ = worker_w_batch.run(
process_documents=lambda batch: ...
) # Pass your method that takes SubscriptionBatch[_T] as an argument, with your logic in it
Worker that operates with a session
Here we create a subscription that sends Order documents that do not have a shipping date.
The worker receiving these documents will update the ShippedAt
field value and save the document back to the server via the session.
subscription_name = store.subscriptions.create_for_options(
SubscriptionCreationOptions(query="from Orders as o where o.ShippedAt = null")
)
subscription_worker = store.subscriptions.get_subscription_worker_by_name(subscription_name, Order)
def _transfer_order_callback(batch: SubscriptionBatch[Order]):
with batch.open_session() as session:
for order in (item.result for item in batch.items):
transfer_order_to_shipment_company(order)
order.shipped_at = datetime.utcnow()
# we know that we have at least one order to ship,
# because the subscription query above has that in it's WHERE clause
session.save_changes()
_ = subscription_worker.run(_transfer_order_callback)
Worker that processes dynamic objects
Here we define a subscription that projects the Order documents into a dynamic format.
The worker processes the dynamic objects it receives.
subscription_name = "My dynamic subscription"
store.subscriptions.create_for_class(
Order,
SubscriptionCreationOptions(
subscription_name,
query="""
From Orders as o
Select
{
dynamic_field_1: "Company: " + o.Company + " Employee: " + o.Employee,
}
""",
),
)
subscription_worker = store.subscriptions.get_subscription_worker_by_name(subscription_name)
def _raise_notification_callback(batch: SubscriptionBatch[Order]):
for item in batch.items:
raise_notification(item.result.dynamic_field_1)
_ = subscription_worker.run(_raise_notification_callback)
Subscription that ends when no documents are left
Here we create a subscription client that runs only up to the point there are no more new documents left to process.
This is useful for ad-hoc, single-use processing where the user needs to ensure that all documents are fully processed.
high_value_orders_worker = store.subscriptions.get_subscription_worker(
SubscriptionWorkerOptions(
subs_id,
# Here we ask the worker to stop when there are no documents left to send.
# Will throw SubscriptionClosedException when it finishes its job
close_when_no_docs_left=True,
),
OrderAndCompany,
)
try:
def _subscription_batch_callback(batch: SubscriptionBatch[OrderAndCompany]):
for item in batch.items:
send_thank_you_note_to_employee(item.result)
high_value_orders_worker.run(_subscription_batch_callback)
except SubscriptionClosedException:
# that's expected
...
Subscription that uses included documents
Here we create a subscription that, in addition to sending all the Order documents to the worker,
will include all the referenced Product documents in the batch sent to the worker.
When the worker accesses these Product documents, no additional requests will be made to the server.
// Create the subscription task on the server:
// ===========================================
var subscriptionName = store.Subscriptions.Create(new SubscriptionCreationOptions()
{
// Include the referenced Product documents for each Order document
Query = @"from Orders include Lines[].Product"
});
// Create the subscription worker that will consume the documents:
// ===============================================================
var subscriptionWorker = store.Subscriptions.GetSubscriptionWorker<Order>(subscriptionName);
_ = subscriptionWorker.Run(batch =>
{
// Open a session via 'batch.OpenSession'
// in order to access the Product documents
using (var session = batch.OpenSession())
{
foreach (var order in batch.Items.Select(x => x.Result))
{
foreach (var orderLine in order.Lines)
{
// Calling Load will Not generate a request to the server,
// because orderLine.Product was included in the batch
var product = session.Load<Product>(orderLine.Product);
ProcessOrderAndProduct(order, product); // call your custom method
}
}
}
});
Subscription workers with failover on other nodes
In this configuration, any available node will create a worker.
If the worker fails, another available node will take over.
worker = store.subscriptions.get_subscription_worker(
SubscriptionWorkerOptions(subscription_name, strategy=SubscriptionOpeningStrategy.WAIT_FOR_FREE), Order
)
Primary and secondary workers
Here we create two workers:
- The primary worker, with a
TAKE_OVER
strategy, will take over the other worker and establish the connection. - The secondary worker, with a
WAIT_FOR_FREE
strategy, will wait for the first worker to fail (due to machine failure, etc.).
The primary worker:
primary_worker = store.subscriptions.get_subscription_worker(SubscriptionWorkerOptions(subscription_name, strategy=SubscriptionOpeningStrategy.TAKE_OVER), Order)
while True:
try:
run_future = primary_worker.run(lambda batch: ...) # your logic
except Exception:
... # retry
The secondary worker:
secondary_worker = store.subscriptions.get_subscription_worker(SubscriptionWorkerOptions(subscription_name), strategy=SubscriptionOpeningStrategy.WAIT_FOR_FREE)
while True:
try:
run_future = secondary_worker.run(lambda batch: ...) # your logic
except Exception:
... # retry