Consume Subscriptions API



Create the subscription worker

Create a subscription worker using get_subscription_worker or get_subscription_worker_by_name.

  • Use the get_subscription_worker method to specify the subscription options while creating the worker.
  • Use the get_subscription_worker_by_name method to create the worker using the default options.

def get_subscription_worker(
    self, options: SubscriptionWorkerOptions, object_type: Optional[Type[_T]] = None, database: Optional[str] = None
) -> SubscriptionWorker[_T]: ...

def get_subscription_worker_by_name(
    self,
    subscription_name: Optional[str] = None,
    object_type: Optional[Type[_T]] = None,
    database: Optional[str] = None,
) -> SubscriptionWorker[_T]: ...
Parameter
options SubscriptionWorkerOptions Options that affect how the worker interacts with the subscription. These options do not alter the definition of the subscription itself.
object_type (Optional) Type[_T] Defines the object type (class) for the items that will be included in the received SubscriptionBatch object.
database (Optional) str The name of the database where the subscription task resides. If None, the default database configured in DocumentStore will be used.
subscription_name (Optional) str The subscription's name. Used when the worker is generated without creating a SubscriptionCreationOptions instance, relying on the default values.
Return value
SubscriptionWorker The subscription worker that has been created.
Initially, it is idle and will only start processing documents when the run function is called.

SubscriptionWorkerOptions

When creating a worker with SubscriptionWorkerOptions, the only mandatory property is subscription_name.
All other parameters are optional and will default to their respective default values if not specified.

Member Type Description
subscription_name str The name of the subscription to which the worker will connect.
time_to_wait_before_connection_retry timedelta The time to wait before attempting to reconnect after a non-aborting failure during subscription processing. Default: 5 seconds.
ignore_subscriber_errors bool Determines if subscription processing is aborted when the worker's batch-handling code throws an unhandled exception.

True – subscription processing will continue.

False (Default) – subscription processing will be aborted.
max_docs_per_batch int The maximum number of documents that the server will try to retrieve and send to the client in a batch. If the server doesn't find as many documents as specified, it will send the documents it has found without waiting. Default: 4096.
close_when_no_docs_left bool Determines whether the subscription connection closes when no new documents are available.

True – The subscription worker processes all available documents and stops when none remain, at which point the run method throws a SubscriptionClosedException.
Useful for ad-hoc, one-time processing.

False (Default) – The subscription worker remains active, waiting for new documents.
send_buffer_size_in_bytes int The size in bytes of the TCP socket buffer used for sending data.
Default: 32,768 bytes (32 KiB).
receive_buffer_size_in_bytes int The size in bytes of the TCP socket buffer used for receiving data.
Default: 4096 (4 KiB).
strategy SubscriptionOpeningStrategy
(enum)
Configures how the server handles connection attempts from workers to a specific subscription task.
Learn more in worker strategies.
Default: OPEN_IF_FREE.

Learn more about SubscriptionOpeningStrategy in worker strategies.

SubscriptionOpeningStrategy
OPEN_IF_FREE Connect if no other worker is connected
WAIT_FOR_FREE Wait for currently connected worker to disconnect
TAKE_OVER Take over the connection
CONCURRENT Connect concurrently

Run the subscription worker

After creating a subscription worker, the subscription worker is still not processing any documents.
To start processing, you need to call the run function of the SubscriptionWorker.

The run function receives the client-side code as a function that will process the received document batches.

def run(self, process_documents: Optional[Callable[[SubscriptionBatch[_T]], Any]]) -> Future: ...
Parameter
process_documents (Optional) [Callable[[SubscriptionBatch[_T]], Any]] Delegate to sync batch processing

SubscriptionBatch[_T]

Member Type Description
items SubscriptionBatch[_T].Item array List of items in the batch
number_of_items_in_batch int Number of items in the batch

def number_of_items_in_batch(self) -> int:
    return 0 if self.items is None else len(self.items)

Subscription worker connectivity

As long as there is no exception, the worker will continue addressing the same server that the first batch was received from.
If the worker fails to reach that node, it will try to failover to another node from the session's topology list.
The node that the worker succeeds connecting to, will inform the worker which node is currently responsible for data subscriptions.

class Item(Generic[_T_Item]):
    """
    Represents a single item in a subscription batch results.
    This class should be used only inside the subscription's run delegate,
    using it outside this scope might cause unexpected behavior.
    """
class SubscriptionBatch(Generic[_T]):

def __init__(self):
    self._result: Optional[_T_Item] = None
    self._exception_message: Optional[str] = None
    self._key: Optional[str] = None
    self._change_vector: Optional[str] = None
    self._projection: Optional[bool] = None
    self._revision: Optional[bool] = None
    self.raw_result: Optional[Dict] = None
    self.raw_metadata: Optional[Dict] = None
    self._metadata: Optional[MetadataAsDictionary] = None
SubscriptionBatch[_T].item Member Type Description
_result (Optional) _T_Item Current batch item
_exception_message (Optional) str Message of the exception thrown during current document processing in the server side
_key (Optional) str Current batch item underlying document ID
_change_vector (Optional) str Current batch item underlying document change vector of the current document
_projection (Optional) bool indicates whether the value id a projection
raw_result (Optional) Dict Current batch item before serialization to T
raw_metadata (Optional) Dict Current batch item underlying document metadata
_metadata (Optional) MetadataAsDictionary Current batch item underlying metadata values

Usage of raw_result, raw_metadata, and _metadata values outside of the document processing delegate is not supported.

SubscriptionWorker[_T]


Methods:

Method Return Type Description
close(bool wait_for_subscription_task = True) void Aborts subscription worker operation ungracefully by waiting for the task returned by the run function to finish running.
run Future[None] Call run to begin the worker's batch processing.
Pass the batch processing delegates to this method
(see above).

Events:

Event Type\Return type Description
after_acknowledgment Callable[[SubscriptionBatch[_T]], None] Event invoked after each time the server acknowledges batch processing progress.
after_acknowledgment Parameters
batch SubscriptionBatch[_T] The batch process which was acknowledged
Return value
Future[None] The worker waits for the task to finish the event processing

Properties:

Member Type Description
current_node_tag str The node tag of the current RavenDB server handling the subscription.
subscription_name str The name of the currently processed subscription.