Sharding: Querying



Querying a sharded database

From a user's point of view, querying a sharded RavenDB database is similar to querying a non-sharded database:
query syntax is the same, and the same results can be expected to be returned in the same format.

To allow this comfort, the database performs the following steps when a client sends a query to a sharded database:

  • The query is received by a RavenDB server that was appointed as an orchestrator.
    The orchestrator mediates all the communications between the client and the database shards.
  • The orchestrator distributes the query to the shards.
  • Each shard runs the query over its own database, using its own indexes.
    When the data is retrieved, the shard transfers it to the orchestrator.
  • The orchestrator combines the data it received from all shards into a single dataset, and may perform additional operations over it.
    E.g., querying a map-reduce index would retrieve from the shards data that has already been reduced by map-reduce indexes.
    Once the orchestrator gets all the data it will reduce the full dataset once again.
  • Finally, the orchestrator returns the combined dataset to the client.
  • The client remains unaware that it has just communicated with a sharded database.
    Note, however, that this process is costly in comparison with the simple data retrieval performed by non-sharded databases.
    Sharding is therefore recommended only when the database has grown to substantial size and complexity.

Querying selected shards

  • A query is normally executed over all shards. However, it is also possible to query only selected shards.
    Querying a specific shard directly avoids unnecessary trips to other shards by the orchestrator.

  • This approach can be useful, for example, when documents are intentionally stored on the same shard using Anchoring documents.

  • To query specific shards using a pre-defined sharding prefix, see: Querying selected shards by prefix.


  • Use method ShardContext together with ByDocumentId or ByDocumentIds to specify which shard/s to query.

  • To identify which shard to query, RavenDB passes the document ID that you provide in the ByDocumentId/s methods to the hashing algorithm, which determines the bucket ID and thus the shard.

  • The document ID parameter is not required to be one of the documents you are querying for;
    it is just used to determine the target shard to query. See the following examples:

Query a selected shard:

Query only the shard containing document companies/1:

// Query for 'User' documents from a specific shard:
// =================================================
var userDocuments = session.Query<User>()
     // Call 'ShardContext' to select which shard to query
     // RavenDB will query only the shard containing document "companies/1"
    .Customize(x => x.ShardContext(s => s.ByDocumentId("companies/1")))
     // The query predicate
    .Where(x => x.Name == "Joe")
    .ToList();

// Variable 'userDocuments' will include all documents of type 'User'
// that match the query predicate and reside on the shard containing document 'companies/1'.

// Query for ALL documents from a specific shard:
// ==============================================
var allDocuments = session.Query<object>() // query with <object>
    .Customize(x => x.ShardContext(s => s.ByDocumentId("companies/1")))
    .ToList();

// Variable 'allDocuments' will include ALL documents
// that reside on the shard containing document 'companies/1'.
// Query for 'User' documents from a specific shard:
// =================================================
var userDocuments = await asyncSession.Query<User>()
     // Call 'ShardContext' to select which shard to query
    .Customize(x => x.ShardContext(s => s.ByDocumentId("companies/1")))
     // The query predicate
    .Where(x => x.Name == "Joe")
    .ToListAsync();

// Query for ALL documents from a specific shard:
// ==============================================
var allDocuments = await asyncSession.Query<object>()
    .Customize(x => x.ShardContext(s => s.ByDocumentId("companies/1")))
    .ToListAsync();
// Query for 'User' documents from a specific shard:
// =================================================
var userDocuments = session.Advanced.DocumentQuery<User>()
    // Call 'ShardContext' to select which shard to query
    .ShardContext(s => s.ByDocumentId("companies/1"))
    // The query predicate
    .Where(x => x.Name == "Joe")
    .ToList();

// Query for ALL documents from a specific shard:
// ==============================================
var allDocuments = session.Advanced.DocumentQuery<object>()
    .ShardContext(s => s.ByDocumentId("companies/1"))
    .ToList();
// Query for 'User' documents from a specific shard:
// =================================================
var userDocuments = await asyncSession.Advanced.AsyncDocumentQuery<User>()
    // Call 'ShardContext' to select which shard to query
    .ShardContext(s => s.ByDocumentId("companies/1"))
    // The query predicate
    .WhereEquals(x => x.Name, "Joe")
    .ToListAsync();

// Query for ALL documents from a specific shard:
// ==============================================
var allDocuments = await asyncSession.Advanced.AsyncDocumentQuery<object>()
    .ShardContext(s => s.ByDocumentId("companies/1"))
    .ToListAsync();
// Query for 'User' documents from a specific shard:
// ================================================
from "Users"
where Name == "Joe"
{ "__shardContext": "companies/1" }

// Query for ALL documents from a specific shard:
// ==============================================
from @all_docs
where Name == "Joe"
{ "__shardContext": "companies/1" }

Query selected shards:

Query only the shards containing documents companies/2 and companies/3:

// Query for 'User' documents from the specified shards:
// =====================================================
var userDocuments = session.Query<User>()
     // Call 'ShardContext' to select which shards to query
     // RavenDB will query only the shards containing documents "companies/2" & "companies/3"
    .Customize(x => x.ShardContext(s => s.ByDocumentIds(new[] { "companies/2", "companies/3" })))
     // The query predicate
    .Where(x => x.Name == "Joe")
    .ToList();

// Variable 'userDocuments' will include all documents of type 'User' that match the query predicate
// and reside on either the shard containing document 'companies/2'
// or the shard containing document 'companies/3'.

// To get ALL documents from the designated shards instead of just 'User' documents,
// query with `session.Query<object>`. 
// Query for 'User' documents from the specified shards:
// =====================================================
var userDocuments = await asyncSession.Query<User>()
     // Call 'ShardContext' to select which shards to query
    .Customize(x => x.ShardContext(s => s.ByDocumentIds(new[] { "companies/2", "companies/3" })))
     // The query predicate
    .Where(x => x.Name == "Joe")
    .ToListAsync();
// Query for 'User' documents from the specified shards:
// =====================================================
var userDocuments = session.Advanced.DocumentQuery<User>()
     // Call 'ShardContext' to select which shards to query
    .ShardContext(s => s.ByDocumentIds(new[] {"companies/2", "companies/3"}))
     // The query predicate
    .Where(x => x.Name == "Joe")
    .ToList();
// Query for 'User' documents from the specified shards:
// =====================================================
var userDocuments = await asyncSession.Advanced.AsyncDocumentQuery<User>()
     // Call 'ShardContext' to select which shards to query
    .ShardContext(s => s.ByDocumentIds(new[] {"companies/2", "companies/3"}))
     // The query predicate
    .WhereEquals(x => x.Name, "Joe")
    .ToListAsync();
// Query for 'User' documents from the specified shards:
// =====================================================
from "Users"
where Name == "Joe"
{ "__shardContext" : ["companies/2", "companies/3"] }

// Query for ALL documents from the specified shards:
// ==================================================
from @all_docs
where Name == "Joe"
{ "__shardContext" : ["companies/2", "companies/3"] }

Including items

  • Including items by a query or an index will work even if the included item resides on another shard.
    If the requested item is not located on this shard, the orchestrator will fetch it from the shard where it is located.

  • Note that this process will cost an extra travel to the shard that hosts the requested item.

Paging results

From the client's point of view, paging is conducted similarly in sharded and non-sharded databases,
and the same API is used to define page size and retrieve selected pages.

Under the hood, however, performing paging in a sharded database entails some overhead since the orchestrator is required to load the requested data from each shard and sort the retrieved results before handing the selected page to the client.

For example, let's compare what happens when we load the 8th page (with a page size of 100) from a non-sharded and a sharded database:

IList<Product> results = session
    .Query<Product, Products_ByUnitsInStock>()
    .Statistics(out QueryStatistics stats) // fill query statistics
    .Where(x => x.UnitsInStock > 10)
    .Skip(700) // skip the first 7 pages (700 results)
    .Take(100) // get pages 701-800
    .ToList();

long totalResults = stats.TotalResults;
IList<Product> results = session
    .Advanced
    .DocumentQuery<Product, Products_ByUnitsInStock>()
    .Statistics(out QueryStatistics stats) // fill query statistics
    .WhereGreaterThan(x => x.UnitsInStock, 10)
    .Skip(700) // skip the first 7 pages (700 results)
    .Take(100) // get pages 701-800
    .ToList();

long totalResults = stats.TotalResults;
public class Products_ByUnitsInStock : AbstractIndexCreationTask<Product>
{
    public Products_ByUnitsInStock()
    {
        Map = products => from product in products
                          select new
                          {
                              UnitsInStock = product.UnitsInStock
                          };
    }
}
  • When the database is Not sharded the server would:

    • Skip 7 pages.
    • Hand page 8 to the client (results 701 to 800).
  • When the database is Sharded the orchestrator would:

    • Load 8 pages (sorted by modification order) from each shard.
    • Sort the retrieved results (in a 3-shard database, for example, the orchestrator would sort 2400 results).
    • Skip 7 pages (of 24).
    • Hand page 8 to the client (results 701 to 800).

The shards sort the data by modification order before sending it to the orchestrator.
For example, if a shard is required to send 800 results to the orchestrator, the first result will be the most recently modified document, while the last result will be the document modified first.

Filtering results

  • Data can be filtered using the where and filter keywords on both non-sharded and sharded databases.

  • There are, however, differences in the behavior of these commands on sharded and non-sharded databases.
    This section explains these differences.


where

where is RavenDB's basic filtering command.
It is used by the server to restrict data retrieval from the database to only those items that match given conditions.

  • On a non-sharded database
    When a query that applies where is executed over a non-sharded database,
    the filtering applies to the entire database.

    To find only the most successful products, we can easily run a query such as:

    from index 'Products/Sales'
    where TotalSales >= 5000

    This will retrieve only the documents of products that were sold at least 5000 times.

  • On a sharded database:
    When a query that includes a where clause is sent to a sharded database,
    filtering is applied per-shard, over each shard's database.

    This presents us with the following problem:
    The filtering that runs on each shard takes into account only the data present on that shard.
    If a certain product was sold 4000 times on each shard, the query demonstrated above will filter this product out on each shard, even though its total sales far exceed 5000.

    To solve this problem, the role of the filter command is altered on sharded databases.

    Using where raises no problem and is actually recommended when the filtering is done over a GroupBy field.


filter

The filter command is used when we want to scan data that has already been retrieved from the database but is still on the server.

  • On a non-sharded database
    When a query that includes a filter clause is sent to a non-sharded database its main usage is as an exploration query: an additional layer of filtering that scans the entire retrieved dataset without creating an index that would then have to be maintained.

    We consider exploration queries one-time operations and use them cautiously because scanning the entire retrieved dataset may take a high toll on resources.

  • On a sharded database:
    When a query that includes a filter clause is sent to a sharded database:

    • The filter clause is omitted from the query.
      All data is retrieved from the shards to the orchestrator.
    • The filter clause is executed on the orchestrator machine over the entire retrieved dataset.

    On the Cons side,
    a huge amount of data may be retrieved from the database and then scanned by the filtering condition.

    On the Pros side,
    this mechanism allows us to filter data using computational fields as we do over a non-sharded database.
    The below query, for example, will indeed return all the products that were sold at least 5000 times,
    no matter how their sales are divided between the shards.

    from index 'Products/Sales'
    filter TotalSales >= 5000

    The results volume retrieved from the shards can be decreased (when it makes sense as part of the query)
    by applying where over a GroupBy field before calling filter.


where vs filter recommendations

As using filter may (unless where is also used) cause the retrieval and scanning of a substantial amount of data,
it is recommended to usefilter cautiously and restrict its operation wherever needed.

  • Prefer where over filter when the query is executed over a GroupBy field.
  • Prefer filter over where when the query is executed over a conditional query field like Total or Sum field.
  • When using filter, set a limit if possible.
  • When filter is needed, use where first to minimize the dataset that needs to be transferred from the shards to the orchestrator and scanned by filter over the orchestrator machine. E.g. -
    from index 'Products/Sales'
    where Category = 'categories/7-A'
    filter TotalSales >= 5000

Querying Map-Reduce indexes

Loading document within a projection

Loading a document within a Map-Reduce projection is not supported in a sharded database.

When attempting to load a document from a Map-Reduce projection, the database will respond with a NotSupportedInShardingException, specifying that "Loading a document inside a projection from a Map-Reduce index isn't supported."

Unlike Map-Reduce index projections, projections of queries that use no index and projections of Map indexes can load a document, provided that the document is on this shard.

Projection Can load Document Condition
Query projection Yes The document is on this shard
Map index projection Yes The document is on this shard
Map-Reduce index projection No

OrderBy in a Map-Reduce index query

Similar to its behavior under a non-sharded database, OrderBy is used in an index query or a dynamic query to sort the retrieved dataset by a given order.

But under a sharded database, when OrderBy is used in a Map-Reduce index and limit is applied to restrict the number of retrieved results, there are scenarios in which all the results will still be retrieved from all shards.
To understand how this can happen, let's run a few queries over this Map-Reduce index:

Reduce = results =>
    from result in results
    group result by result.Name
    into g
    select new Result
    {
        // Group-by field (reduce key)
        Name = g.Key,
        // Computation field
        Sum = g.Sum(x => x.Sum)
    };
  • The first query sorts the results using OrderBy without setting any limit.
    This will load all matching results from all shards (just like this query would load all matching results from a non-sharded database).

    var queryResult = session.Query<UserMapReduce.Result, UserMapReduce>()
            .OrderBy(x => x.Name)
            .ToList();
  • The second query sorts the results by one of the GroupBy fields, Name, and sets a limit to restrict the retrieved dataset to 3 results.
    This will restrict the retrieved dataset to the set limit.

    var queryResult = session.Query<UserMapReduce.Result, UserMapReduce>()
            .OrderBy(x => x.Name)
            .Take(3) // this limit will apply while retrieving the items
            .ToList();
  • The third query sorts the results not by a GroupBy field but by a field that computes a sum from retrieved values. This will retrieve all the results from all shards regardless of the set limit, perform the computation over them all, and only then sort them and provide us with just the number of results we requested.

    var queryResult = session.Query<UserMapReduce.Result, UserMapReduce>()
            .OrderBy(x => x.Sum)
            .Take(3) // this limit will only apply after retrieving all items
            .ToList();

Note that retrieving all the results from all shards, either by setting no limit or by setting a limit based on a computation as demonstrated above, may cause the retrieval of a large amount of data and extend memory, CPU, and bandwidth usage.

Timing queries

  • The duration of queries and query parts (e.g. optimization or execution time) can be measured using API or Studio.

  • In a sharded database, the timings for each part will be provided per shard.

  • Timing is disabled by default, to avoid the measuring overhead.
    It can be enabled per query by adding include timings() to an RQL query or calling Timings() in your query code, as explained in include query timings.

  • To view the query timings in the Studio, open the Query View,
    run an RQL query with include timings(), and open the Timings tab.

"Timing Shards Querying"

Timing Shards Querying

  1. Textual view of query parts and their duration.
    Point the mouse cursor at captions to display timing properties in the graphical view on the right.
  2. Per-shard Timings
  3. Graphical View
    Point the mouse cursor at graph sections to display query parts duration:
    A. Shard #0 overall query time
    B. Shard #0 optimization period
    C. Shard #0 query period
    D. Shard #0 staleness period

Unsupported querying features

Querying features that are not supported or not yet implemented on sharded databases include:

  • Loading a document that resides on another shard
    An index or a query can only load a document if it resides on the same shard.
    Loading a document that resides on a different shard will return null instead of the loaded document.

  • Loading a document within a map-reduce projection
    Read more about this topic above.

  • Streaming Map-Reduce results
    Streaming map-reduce results is not supported in a sharded database.

  • Querying with a limit is not supported in patch/delete by query operations
    Attempting to set a limit when executing PatchByQueryOperation or DeleteByQueryOperation
    will throw a NotSupportedInShardingException exception.

  • Querying for similar documents with MoreLikeThis
    Method MoreLikeThis is not supported in a sharded database.