Ongoing Tasks: Elasticsearch ETL



Elasticsearch ETL

  • The following steps are required when creating an Elasticsearch ETL task:

    • Define a connection string which includes:
      • URLs to Elasticsearch nodes.
      • Authentication method required by the Elasticsearch nodes.
    • Define the Elasticsearch Indexes
      • Indexes are used by Elasticsearch to store and locate documents.
      • The ETL task will insert new documents to the specified Elasticsearch destinations.
      • If not otherwise specified, existing Elasticsearch documents will be removed before adding new documents.
      • A document identifier field property is defined per index, and used by the delete command to locate the matching documents.
    • Define Transformation Scripts.
      The transformation script determines which RavenDB documents will be transferred, to which Elasticsearch Indexes, and in what form.
  • For a thorough step-by-step explanation:

    • Learn here to define an Elasticsearch ETL task using code.
    • Learn here to define an Elasticsearch ETL task using Studio.

Transformation Script

  • The structure and syntax of an Elasticsearch ETL transformation script are similar to those of all other ETL types (RavenDB ETL, SQL ETL, and OLAP ETL) scripts.
    The script is used to select the documents the task would Extract from the database, Transform the retrieved data, and Load it to the Elasticsearch destination.
    Learn about ETL transformation scripts here.

  • The script Loads data to the Elasticsearch destination using the loadTo<Target>(obj) command.

    • Target is the name of the Elasticsearch index to which the data is transferred.
      • In your task settings, define Elasticsearch Index names using lower-case characters.
        E.g. orders
      • In your transformation script, however, you can define Target using higher and lower case characters, as you prefer. (the task will transform the index name to lower-case characters while connecting Elasticsearch.)
        E.g. use either loadToOrders or loadToorders.
    • obj is an object defined by the script, that will be loaded to Elasticsearch.
      E.g. orderData in the following script:
      var orderData = { DocId: id(this),
                        OrderLinesCount: this.Lines.length,
                        TotalCost: 0 };
      
      loadToOrders(orderData);

Data Delivery


What is Transferred

An Elasticsearch ETL task transfers documents only.
Document extensions like attachments, counters, or time series, will not be transferred.


Transactions

The task delivers the data to the Elasticsearch destination in one or two calls per index.

  1. an optional _delete_by_query command, to delete existing versions of RavenDB documents from Elasticsearch before appending new ones.
    POST orders/_delete_by_query?refresh=true
    {"query":{"terms":{"DocID":["orders/1-a"]}}}
  2. _bulk command, to append RavenDB documents to the Elasticsearch destination.
    POST orders/_bulk?refresh=wait_for
    {"index":{"_id":null}}
    {"OrderLinesCount":3,"TotalCost":0,"DocID":"orders/1-a"}

Document Identifiers

  • When Elasticsearch stores RavenDB documents, it provides each of them with an automatically-generated iD.
  • RavenDB needs to delete and replace documents, but cannot do it using Elasticsearch's arbitrarily generated IDs.
    Instead, it uses one of the document's properties as ID.
  • You need to decide which document property RavenDB would use as a document identifier.
    To define it:
  • The identifier must be a property that the transformation script passes to Elasticsearch.
    E.g., the DocId property that is created by the script below can be used as an identifier.
    var orderData = {
                     DocId: id(this),
                     OrderLinesCount: this.Lines.length,
                     TotalCost: 0
                      };
    
    loadToOrders(orderData);

Insert Only Mode

You can enable the task's Insert Only mode using code or via Studio, to omit _delete_by_query commands and so refrain from deleting documents before the transfer.

Enabling Insert Only can boost the task's performance when there is no need to delete documents before loading them.

Be aware that enabling Insert Only mode will append documents to Elasticsearch whenever they are modified on RavenDB, without removing existing documents. If document versions that are not needed accumulate and storage space is a concern, keep Insert Only disabled.

Elasticsearch Index Definition

  • When the Elasticsearch ETL task runs for the very first time, it will create any Elsasticsearch index defined in the task that dosn't exist yet.

  • When creating the index, the document property that will hold the RavenDB document ID will be defined as a non-analyzed field, with type keyword to avoid having full-text-search on it.
    This way the RavenDB document identifiers won't be analyzed and the task will be able to _delete_by_query using exact match on those IDs.
    I.e.

    PUT /newIndexName
    {
      "mappings": {
          "properties": {
              "DocId": {   // the DocumentIdProperty
                  "type": "keyword"
              }
          }
       }
    }

If you choose to create the Elasticsearch Index on your own (before running the Elasticsearch ETL task), you must define the DocumentIdProperty type property as "keyword" in your index definition.

Client API

Add an Elasticsearch ETL Task

  • To define an Elasticsearch ETL task through the client, use the AddEtlOperation API method as shown below.
    Pass it an ElasticSearchEtlConfigurationinstance with -
    • The name of a defined Connection String.
      You can define a connection string using code or via Studio.
    • A list of Elasticsearch Indexes.
    • A list of Transformation Scripts.

Code Sample:

// Create an Elasticsearch ETL task
AddEtlOperation<ElasticSearchConnectionString> operation = new AddEtlOperation<ElasticSearchConnectionString>(
new ElasticSearchEtlConfiguration()
{
    ConnectionStringName = elasticSearchConnectionString.Name, // Connection String name
    Name = "ElasticsearchEtlTask", // ETL Task name
        
    ElasticIndexes =
    {
        // Define Elasticsearch Indexes
        new ElasticSearchIndex { // Elasticsearch Index name
                                 IndexName = "orders", 
                                 // Elasticsearch identifier for transferred RavenDB documents 
                                 // (make sure a property with this name is defined in the transform script)
                                 DocumentIdProperty = "DocId", 
                                 // If true, don't send _delete_by_query before appending docs
                                 InsertOnlyMode = false }, 
        new ElasticSearchIndex { IndexName = "lines",
                                 DocumentIdProperty = "OrderLinesCount", 
                                 InsertOnlyMode = true 
                               }
    },
    Transforms =
    {   // Transformation script configuration
        new Transformation()
        {
            Collections = { "Orders" }, // RavenDB collections that the script uses
            Script = @"var orderData = {
                       DocId: id(this),
                       OrderLinesCount: this.Lines.length,
                       TotalCost: 0
                       };

                       loadToOrders(orderData);", 
            Name = "TransformIDsAndLinesCount" // Transformation script Name
        }
    }
});

store.Maintenance.Send(operation);

Task Properties

  • ElasticSearchEtlConfiguration

    Property Type Description
    Name string ETL Task Name
    ConnectionStringName string The name of the connection string used by this task
    ElasticIndexes List<ElasticSearchIndex> A list of Elasticsearch indexes
    Transforms List<Transformation> A list of transformation scripts
  • ElasticSearchIndex (A list of Elasticsearch indexes)

    Property Type Description
    IndexName string Elasticsearch Index name.
    Name indexes using lower-case characters only, e.g. orders.
    DocumentIdProperty string The document ID property defined on the transferred document object inside the transformation script.
    InsertOnlyMode bool true - Do not delete existing documents before appending new ones.
    false - Delete existing document versions before appending documents.


Add an Elasticsearch Connection String

  • An Elasticsearch connection string includes a list of Elasticsearch destinations URLs, and determines the Authentication Method the client needs to access them.
    • Omit the Authentication property if the Elasticsearch destination requires no authentication.
    • Add a connection string as shown below.

Code Sample:

// Create a Connection String to Elasticsearch
var elasticSearchConnectionString = new ElasticSearchConnectionString
{
    // Connection String Name
    Name = "ElasticConStr", 
    // Elasticsearch Nodes URLs
    Nodes = new[] { "http://localhost:9200" }, 
    // Authentication Method
    Authentication = new Raven.Client.Documents.Operations.ETL.ElasticSearch.Authentication 
    { 
        Basic = new BasicAuthentication
        {
            Username = "John",
            Password = "32n4j5kp8"
        }
    }
};

store.Maintenance.Send(new PutConnectionStringOperation<ElasticSearchConnectionString>(elasticSearchConnectionString));

Connection String Properties

  • ElasticSearchConnectionString (the configuration for each ETL task destination) =

    Property Type Description
    Name string Connection string Name
    Nodes string[] A list of URLs to Elasticsearch destinations
    Authentication Authentication Optional authentication methods
  • Authentication (Authentication methods)

    Property Type Description
    Basic BasicAuthentication Authenticate transfers by user name and password
    ApiKey ApiKeyAuthentication Authenticate transfers by an API key
    Certificate CertificateAuthentication Authenticate transfers by certificate
  • BasicAuthentication (Authenticate transfers by user name and password)

    Property Type
    Username string
    Password string
  • ApiKeyAuthentication (Authenticate transfers by an API key)

    Property Type
    ApiKeyId string
    ApiKey string
  • CertificateAuthentication (Authenticate transfers by certificate)

    Property Type Description
    CertificatesBase64 string[] A valid certificate string

Supported Elasticsearch Versions

RavenDB supports Elasticsearch Server version 7 and up.