Sharding: ETL



ETL

Sharded and Non-Sharded ETL Tasks

From a user's point of view, creating an ongoing ETL process is done by defining and running a single ETL task, just like it is done under a non-sharded database.

Behind the scenes, though, each shard defines and uses its own ETL task to send data from its database to the ETL destination independently from other shards.

Distributing the ETL responsibility between the shards allows RavenDB to keep its ETL destination updated with data additions and modifications no matter how large the overall database gets.

Non-Sharded Database ETL Tasks

  • A complete replica of the database is kept by each cluster node.
  • Any node can therefore be made responsible for ETL by the cluster.
  • The responsible node runs the ETL task periodically to update the ETL destination with any data changes.

Sharded Database ETL Tasks

  • Each shard hosts a unique dataset, so no single node can monitor the entire database.
  • When a user defines an ETL task, either via Studio or using API commands like PutConnectionStringOperation and AddEtlOperation, the change made in the database record triggers each shard to create an ETL task of its own, based on the user-defined task.
    This creation of multiple ETL tasks, one per shard, is automatic and requires no additional actions from the user.
  • Each shard appoints one of its nodes responsible for the execution of the shard's ETL task.
  • The shards' ETL tasks behave just like an ETL task of a non-sharded database would, Extractng relevant data from the shard's database, Transforming it using a user-defined script, and Loading it to the destination.
  • If the responsible node fails a failover scenario will start, another shard node will be made responsible for the task, and the transfer will continue from the point of failure.

ETL Queries

Queries used by an ETL task's transform script on a sharded database are basically no different than queries executed over a non-sharded database.
However, as some querying features are not yet implemented on a sharded database (e.g. loading a document that resides on a different shard will fail) and others behave a little differently than their non-sharded equivalents (e.g. filer), it is recommended to read the pages dedicated to indexing and querying on a sharded database.

ETL and Resharding

It may happen that an ETL task would send the same data more than once.
One scenario that would make this happen is resharding: a document can be sent from one shard by the shard's ETL task, resharded, and then sent again to the ETL destination by its new shard's ETL task.

Some ETL destinations will store duplicate incoming documents instead of their former copies. Others, like OLAP and Queue ETL destinations, will Not automatically recognize such events.
It is the user's responsibility to verify that the loaded documents are handled as expected when they arrive.

OLAP helps users detect duplications using lastModified, see a more thorough discussion of this here and relevant code samples here.

Retrieving Shard-Specific ETL Task Info

  • The GetOngoingTaskInfoOperation store operation can be used on a non-sharded database to retrieve a task's information.

  • GetOngoingTaskInfoOperation can also be used on a sharded database.

    • Get Task Info Per Database
      Run GetOngoingTaskInfoOperation using store.Maintenance.Send to retrieve information regarding the basic task defined by the user.
      The information includes the task's name and ID.
    • Get Task Info Per Shard
      Run GetOngoingTaskInfoOperation using store.Maintenance.ForShard(x).Send, where x is the shard number, to retrieve information about the selected shard's task.
      Much more information is available here, including details of the responsible and mentor nodes.
      // Get basic info regarding the user-defined task
      var ongoingTask = store.Maintenance.Send(
                      new GetOngoingTaskInfoOperation(name, OngoingTaskType.RavenEtl));
      
      if (ongoingTask != null)
      {
          // go through the shards and retrieve their info
          for (int i = 0; i < 3; i++)
          {
              var singleShardInfo = store.Maintenance.ForShard(i).Send(
                      new GetOngoingTaskInfoOperation(name, OngoingTaskType.RavenEtl));
      
              shardName[i] = singleShardInfo.TaskName;
              NodeTag[i] = singleShardInfo.ResponsibleNode.NodeTag;
              ongoingTaskConnectionStatus[i] = singleShardInfo.TaskConnectionStatus;
              mentorNode[i] = singleShardInfo.MentorNode;
              ongoingTaskConnectionStatus[i] = singleShardInfo.TaskConnectionStatus;
      
          }
      }