Consume Subscriptions API
Create the subscription worker
Create a subscription worker using get_subscription_worker
or get_subscription_worker_by_name
.
- Use the
get_subscription_worker
method to specify the subscription options while creating the worker. - Use the
get_subscription_worker_by_name
method to create the worker using the default options.
def get_subscription_worker(
self, options: SubscriptionWorkerOptions, object_type: Optional[Type[_T]] = None, database: Optional[str] = None
) -> SubscriptionWorker[_T]: ...
def get_subscription_worker_by_name(
self,
subscription_name: Optional[str] = None,
object_type: Optional[Type[_T]] = None,
database: Optional[str] = None,
) -> SubscriptionWorker[_T]: ...
Parameter | ||
---|---|---|
options | SubscriptionWorkerOptions |
Options that affect how the worker interacts with the subscription. These options do not alter the definition of the subscription itself. |
object_type (Optional) | Type[_T] |
Defines the object type (class) for the items that will be included in the received SubscriptionBatch object. |
database (Optional) | str |
The name of the database where the subscription task resides. If None , the default database configured in DocumentStore will be used. |
subscription_name (Optional) | str |
The subscription's name. Used when the worker is generated without creating a SubscriptionCreationOptions instance, relying on the default values. |
Return value | |
---|---|
SubscriptionWorker |
The subscription worker that has been created. Initially, it is idle and will only start processing documents when the run function is called. |
SubscriptionWorkerOptions
When creating a worker with SubscriptionWorkerOptions
, the only mandatory property is subscription_name
.
All other parameters are optional and will default to their respective default values if not specified.
Member | Type | Description |
---|---|---|
subscription_name | str |
The name of the subscription to which the worker will connect. |
time_to_wait_before_connection_retry | timedelta |
The time to wait before attempting to reconnect after a non-aborting failure during subscription processing. Default: 5 seconds. |
ignore_subscriber_errors | bool |
Determines if subscription processing is aborted when the worker's batch-handling code throws an unhandled exception.True – subscription processing will continue.False (Default) – subscription processing will be aborted. |
max_docs_per_batch | int |
The maximum number of documents that the server will try to retrieve and send to the client in a batch. If the server doesn't find as many documents as specified, it will send the documents it has found without waiting. Default: 4096. |
close_when_no_docs_left | bool |
Determines whether the subscription connection closes when no new documents are available.True – The subscription worker processes all available documents and stops when none remain, at which point the run method throws a SubscriptionClosedException .Useful for ad-hoc, one-time processing. False (Default) – The subscription worker remains active, waiting for new documents. |
send_buffer_size_in_bytes | int |
The size in bytes of the TCP socket buffer used for sending data. Default: 32,768 bytes (32 KiB). |
receive_buffer_size_in_bytes | int |
The size in bytes of the TCP socket buffer used for receiving data. Default: 4096 (4 KiB). |
strategy | SubscriptionOpeningStrategy (enum) |
Configures how the server handles connection attempts from workers to a specific subscription task. Learn more in worker strategies. Default: OPEN_IF_FREE . |
Learn more about SubscriptionOpeningStrategy
in worker strategies.
SubscriptionOpeningStrategy |
|
---|---|
OPEN_IF_FREE |
Connect if no other worker is connected |
WAIT_FOR_FREE |
Wait for currently connected worker to disconnect |
TAKE_OVER |
Take over the connection |
CONCURRENT |
Connect concurrently |
Run the subscription worker
After creating a subscription worker, the subscription worker is still not processing any documents.
To start processing, you need to call the run
function of the SubscriptionWorker.
The run
function receives the client-side code as a function that will process the received document batches.
def run(self, process_documents: Optional[Callable[[SubscriptionBatch[_T]], Any]]) -> Future: ...
Parameter | ||
---|---|---|
process_documents (Optional) | [Callable[[SubscriptionBatch[_T]], Any]] |
Delegate to sync batch processing |
SubscriptionBatch[_T]
Member | Type | Description |
---|---|---|
items | SubscriptionBatch[_T].Item array |
List of items in the batch |
number_of_items_in_batch | int |
Number of items in the batch |
def number_of_items_in_batch(self) -> int:
return 0 if self.items is None else len(self.items)
Subscription worker connectivity
As long as there is no exception, the worker will continue addressing the same
server that the first batch was received from.
If the worker fails to reach that node, it will try to
failover to another node
from the session's topology list.
The node that the worker succeeds connecting to, will inform the worker which
node is currently responsible for data subscriptions.
class Item(Generic[_T_Item]):
"""
Represents a single item in a subscription batch results.
This class should be used only inside the subscription's run delegate,
using it outside this scope might cause unexpected behavior.
"""
class SubscriptionBatch(Generic[_T]):
def __init__(self):
self._result: Optional[_T_Item] = None
self._exception_message: Optional[str] = None
self._key: Optional[str] = None
self._change_vector: Optional[str] = None
self._projection: Optional[bool] = None
self._revision: Optional[bool] = None
self.raw_result: Optional[Dict] = None
self.raw_metadata: Optional[Dict] = None
self._metadata: Optional[MetadataAsDictionary] = None
SubscriptionBatch[_T].item Member |
Type | Description |
---|---|---|
_result (Optional) | _T_Item |
Current batch item |
_exception_message (Optional) | str |
Message of the exception thrown during current document processing in the server side |
_key (Optional) | str |
Current batch item underlying document ID |
_change_vector (Optional) | str |
Current batch item underlying document change vector of the current document |
_projection (Optional) | bool |
indicates whether the value id a projection |
raw_result (Optional) | Dict |
Current batch item before serialization to T |
raw_metadata (Optional) | Dict |
Current batch item underlying document metadata |
_metadata (Optional) | MetadataAsDictionary |
Current batch item underlying metadata values |
Usage of raw_result
, raw_metadata
, and _metadata
values outside of the document processing delegate
is not supported.
SubscriptionWorker[_T]
Methods:
Method | Return Type | Description |
---|---|---|
close(bool wait_for_subscription_task = True) |
void |
Aborts subscription worker operation ungracefully by waiting for the task returned by the run function to finish running. |
run |
Future[None] |
Call run to begin the worker's batch processing.Pass the batch processing delegates to this method (see above). |
Events:
Event | Type\Return type | Description |
---|---|---|
after_acknowledgment | Callable[[SubscriptionBatch[_T]], None] |
Event invoked after each time the server acknowledges batch processing progress. |
after_acknowledgment Parameters |
||
---|---|---|
batch | SubscriptionBatch[_T] |
The batch process which was acknowledged |
Return value | |
---|---|
Future[None] |
The worker waits for the task to finish the event processing |
Properties:
Member | Type | Description |
---|---|---|
current_node_tag | str |
The node tag of the current RavenDB server handling the subscription. |
subscription_name | str |
The name of the currently processed subscription. |