Ongoing Tasks: Elasticsearch ETL



Elasticsearch ETL

  • The following steps are required when creating an Elasticsearch ETL task:

    • Define a connection string which includes:
      • URLs to Elasticsearch nodes.
      • Authentication method required by the Elasticsearch nodes.
    • Define the Elasticsearch Indexes
      • Indexes are used by Elasticsearch to store and locate documents.
      • The ETL task will send new documents to the specified Elasticsearch indexes.
      • If not otherwise specified, existing Elasticsearch documents will be removed before adding new documents.
      • A document identifier field property is defined per document, and used by the delete command to locate the matching documents.
    • Define Transformation Scripts.
      The transformation script determines which RavenDB documents will be transferred, to which Elasticsearch Indexes, and in what form.
  • For a thorough step-by-step explanation:

    • Learn here to define an Elasticsearch ETL task using code.
    • Learn here to define an Elasticsearch ETL task using Studio.

Transformation Script

  • The structure and syntax of an Elasticsearch ETL transformation script are similar to those of all other ETL types (RavenDB ETL, SQL ETL, and OLAP ETL) scripts.
    The script defines which documents will be Extracted from the database, Transforms the retrieved data, and Loads it to the Elasticsearch destination. Learn about ETL transformation scripts here.

  • The script Loads data to the Elasticsearch destination using the loadTo<Target>(obj) command.

    • Target is the name of the Elasticsearch index to which the data is transferred.
      • In the task settings:
        Define Elasticsearch Index names using only lower-case characters (as required by Elasticsearch).
        E.g. orders
      • In the transformation script:
        The target can be defined using both upper and lower-case characters.
        The task will transform the index name to all lower-case characters before sending it to Elasticsearch.
        E.g. use either loadToOrders or loadToorders.
    • obj is an object defined by the script, that will be loaded to Elasticsearch.
      It determines the shape and contents of the document that will be created on the Elasticsearch Index.
      E.g., the following script defines the orderData object and loads it to the orders index:
      var orderData = { DocId: id(this),
                        OrderLinesCount: this.Lines.length,
                        TotalCost: 0 };
      
      loadToOrders(orderData);

Alternative Syntax

The target index name can be passed to the loadTo command separately, as a string argument, using this syntax: loadTo('Target', obj)

  • Example:
    The following two calls to loadTo are equivalent.
    loadToOrders(obj);
    loadTo('Orders', obj);

  • The target name 'Orders' in this syntax is not a variable and cannot be used as one: it is simply a string literal of the target's name.
  • Separating the target name from the loadTo command makes it possible to include symbols like - and . in target names. This is not possible when the standard loadToOrders syntax is used because including special characters in the name of a JS function turns it invalid.

Data Delivery


What is Transferred

An Elasticsearch ETL task transfers documents only.
Document extensions like attachments, counters, or time series, will not be transferred.


Document Identifiers

  • When Elasticsearch stores RavenDB documents, it provides each of them with an automatically-generated iD.
  • RavenDB needs to delete and replace documents, but cannot do it using Elasticsearch's arbitrarily generated IDs.
    Instead, one of the transferred document's properties is used as ID.
  • The identifier must be a property that the transformation script passes to Elasticsearch.
    To achieve this:
    • Add a dedicated property to the transferred data structure in your script, that will hold the original RavenDB document ID.
      The property's Name can be any name of your choice.
      The property's Value must be: id(this)
    • E.g., the DocId property below is used to hold the RavenDB document ID in the transferred document.
      var orderData = {
                       DocId: id(this), // document ID property
                       OrderLinesCount: this.Lines.length,
                       TotalCost: 0
                        };
      
      loadToOrders(orderData);
  • In addition to specifying this document property in the script, it must be defined for the ETL task:

Transactions

The task delivers the data to the Elasticsearch destination in one or two calls per index.

  1. _delete_by_query:
    An optional command, to delete existing versions of RavenDB documents from Elasticsearch before appending new ones.
    POST orders/_delete_by_query?refresh=true
    {"query":{"terms":{"DocID":["orders/1-a"]}}}
  2. _bulk :
    Append RavenDB documents to the Elasticsearch destination.
    POST orders/_bulk?refresh=wait_for
    {"index":{"_id":null}}
    {"OrderLinesCount":3,"TotalCost":0,"DocID":"orders/1-a"}

Insert Only Mode

You can enable the task's Insert Only mode using code or via Studio, to omit _delete_by_query commands and so refrain from deleting documents before the transfer.

Enabling Insert Only can boost the task's performance when there is no need to delete documents before loading them.

Be aware that enabling Insert Only mode will append documents to Elasticsearch whenever they are modified on RavenDB, without removing existing documents. If document versions that are not needed accumulate and storage space is a concern, keep Insert Only disabled.

Elasticsearch Index Definition

  • When the Elasticsearch ETL task runs for the very first time, it will create any Elsasticsearch index defined in the task that doesn't exist yet.

  • When the index is created, the document property that holds the RavenDB document ID will be defined as a non-analyzed field, with type keyword to avoid having full-text-search on it.
    This way the RavenDB document identifiers won't be analyzed and the task will be able to _delete_by_query using exact match on those IDs.
    I.e.

    PUT /newIndexName
    {
      "mappings": {
          "properties": {
              "DocId": {   // the DocumentIdProperty
                  "type": "keyword"
              }
          }
       }
    }

If you choose to create the Elasticsearch Index on your own (before running the Elasticsearch ETL task), you must define the DocumentIdProperty type property as "keyword" in your index definition.

Client API

Add an Elasticsearch ETL Task

  • To define an Elasticsearch ETL task through the client, use the AddEtlOperation API method as shown below.
    Pass it an ElasticSearchEtlConfigurationinstance with -
    • The name of a defined Connection String.
      You can define a connection string using code or via Studio.
    • A list of Elasticsearch Indexes.
    • A list of Transformation Scripts.

Code Sample:

// Create an Elasticsearch ETL task
AddEtlOperation<ElasticSearchConnectionString> operation = new AddEtlOperation<ElasticSearchConnectionString>(
new ElasticSearchEtlConfiguration()
{
    ConnectionStringName = elasticSearchConnectionString.Name, // Connection String name
    Name = "ElasticsearchEtlTask", // ETL Task name
        
    ElasticIndexes =
    {
        // Define Elasticsearch Indexes
        new ElasticSearchIndex { // Elasticsearch Index name
                                 IndexName = "orders", 
                                 // The Elasticsearch document property that will contain
                                 // the source RavenDB document id.
                                 // Make sure this property is also defined inside the
                                 // transform script.
                                 DocumentIdProperty = "DocId", 
                                 InsertOnlyMode = false }, 
        new ElasticSearchIndex { IndexName = "lines",
                                 DocumentIdProperty = "OrderLinesCount", 
                                 // If true, don't send _delete_by_query before appending docs
                                 InsertOnlyMode = true 
                               }
    },
    Transforms =
    {   // Transformation script configuration
        new Transformation()
        {
            // RavenDB collections that the script uses
            Collections = { "Orders" }, 

            Script = @"var orderData = {
                       DocId: id(this),
                       OrderLinesCount: this.Lines.length,
                       TotalCost: 0
                       };

                       // Write the `orderData` as a document to the Elasticsearch 'orders' index
                       loadToOrders(orderData);", 
            
            // Transformation script Name
            Name = "TransformIDsAndLinesCount" 
        }
    }
});

store.Maintenance.Send(operation);

Task Properties

  • ElasticSearchEtlConfiguration

    Property Type Description
    Name string ETL Task Name
    ConnectionStringName string The name of the connection string used by this task
    ElasticIndexes List<ElasticSearchIndex> A list of Elasticsearch indexes
    Transforms List<Transformation> A list of transformation scripts
  • ElasticSearchIndex

    Property Type Description
    IndexName string Elasticsearch Index name.
    Name indexes using lower-case characters only, e.g. orders.
    DocumentIdProperty string The document ID property defined on the transferred document object inside the transformation script.
    InsertOnlyMode bool true - Do not delete existing documents before appending new ones.
    false - Delete existing document versions before appending documents.


Add an Elasticsearch Connection String

  • An Elasticsearch connection string includes a list of Elasticsearch destinations URLs, and determines the Authentication Method required to access them.
    • Omit the Authentication property if the Elasticsearch destination requires no authentication.
    • Add a connection string as shown below.

Code Sample:

// Create a Connection String to Elasticsearch
var elasticSearchConnectionString = new ElasticSearchConnectionString
{
    // Connection String Name
    Name = "ElasticConStr", 
    // Elasticsearch Nodes URLs
    Nodes = new[] { "http://localhost:9200" }, 
    // Authentication Method
    Authentication = new Raven.Client.Documents.Operations.ETL.ElasticSearch.Authentication 
    { 
        Basic = new BasicAuthentication
        {
            Username = "John",
            Password = "32n4j5kp8"
        }
    }
};

store.Maintenance.Send(new PutConnectionStringOperation<ElasticSearchConnectionString>(elasticSearchConnectionString));

Connection String Object

  • ElasticSearchConnectionString

    Property Type Description
    Name string Connection string Name
    Nodes string[] A list of URLs to Elasticsearch destinations
    Authentication Authentication Optional authentication method
    (Do not use when no authentication is required)
  • Authentication (Authentication methods)

    Property Type Description
    Basic BasicAuthentication Authenticate connection by username and password
    ApiKey ApiKeyAuthentication Authenticate connection by an API key
    Certificate CertificateAuthentication Authenticate connection by certificate
  • BasicAuthentication (Authenticate transfers by user name and password)

    Property Type
    Username string
    Password string
  • ApiKeyAuthentication (Authenticate transfers by an API key)

    Property Type
    ApiKeyId string
    ApiKey string
  • CertificateAuthentication (Authenticate transfers by certificate)

    Property Type Description
    CertificatesBase64 string[] A valid certificate string

Supported Elasticsearch Versions

RavenDB supports Elasticsearch Server version 7 and up.