Sharding: Querying



Querying in a Sharded Database

From a user's point of view, querying a sharded RavenDB database is similar to querying a non-sharded database: query syntax is the same, and the same results can be expected to be returned in the same format.

To allow this comfort, the database performs the following steps when we send a query to a sharded database:

  • The query is received by a RavenDB server that was appointed orchestrator and now mediates all the communications between the client and the database shards.
  • The orchestrator distributes the query to the shards.
  • Each shard runs the query over its own database, using its own indexes.
    When the data is retrieved, the shard transfers it to the orchestrator.
  • The orchestrator combines the data it received from all shards into a single dataset, and may perform additional operations over it.
    E.g., running a map-reduce query would retrieve from the shards data that has already been reduced by map-reduce indexes, but once the orchestrator gets all the data it will reduce the full dataset once again.
  • Finally, the orchestrator returns the combined dataset to the client.
  • The client remains unaware that it has just communicated with a sharded database.
    Note, however, that this process is costly in comparison with the simple data retrieval performed by non-sharded databases.
    Sharding is therefore recommended only when the database has grown to substantial size and complexity.

Querying Map-Reduce Indexes

  • Map-reduce indexes on a sharded database are used to reduce data both over each shard during indexation, and on the orchestrator machine each time a query uses them.
  • Read more below about querying map-reduce indexes in a sharded database.

Filtering Results in a Sharded Database

  • We can filter data using the keywords where and filter on both non-sharded and sharded databases.
  • There are, however, differences in the behavior of these commands on sharded and non-sharded databases. This section explains these differences.

where

where is RavenDB's basic filtering command. It is used by the server to restrict data retrieval from the database to only those items that match given conditions.

  • where on a non-sharded database
    When a query that applies where is executed over a non-sharded database, the filtering applies to the entire database.

    If we query all products, for example, and want to find only the most successful products, we can easily run a query such as:

    from index 'Products/Sales'
    where TotalSales >= 5000
    This will retrieve only the documents of products that were sold at least 5000 times.
  • where on a sharded database
    When a query that includes a where clause is sent to a sharded database, filtering is applied per-shard, over each shard's database.

    This presents us with the following problem:
    The filtering that runs on each shard takes into account only the data present on that shard.
    If a certain product was sold 4000 times on each shard, the query demonstrated above will filter this product out on each shard, even though its total sales far exceed 5000.

    To solve this problem, the role of the filter command is altered on sharded databases.

    Using where raises no problem, and is actually recommended, when the filtering is done over a GroupBy field.

filter

The filter command is used when we want to scan data that has already been retrieved from the database but is still on the server.

  • When a query that includes a filter clause is sent to a non-sharded database its main usage is as an exploration query: an additional layer of filtering that scans the entire retrieved dataset without creating an index that would then have to be maintained.

    We consider exploration queries one-time operations and use them cautiously because scanning the entire retrieved dataset may take a high toll on resources.

  • When a query that includes a filter clause is sent to a sharded database:

    • The filter clause is omitted from the query.
      All data is retrieved from the shards to the orchestrator.
    • The filter clause is executed on the orchestrator machine over the entire downloaded dataset.

On the Cons side, a huge amount of data may be retrieved from the database and then scanned by the filtering condition.

On the Pros side, this mechanism allows us to filter data using computational fields as we do over a non-sharded database.
The below query, for example, will indeed return all the products that were sold at least 5000 times, no matter how their sales are divided between the shards.

from index 'Products/Sales'
filter TotalSales >= 5000

The results volume retrieved from the shards can be decreased (when it makes sense as part of the query) by applying where over a GroupBy field before calling filter.

Projection

Loading a document within a map-reduce projection is not supported in a Sharded Database.

Database response to an attempt to load a document from a map-reduce projection:
A NotSupportedInShardingException exception will be thrown, specifying "Loading a document inside a projection from a map-reduce index isn't supported".

Unlike map-reduce index projections, projections of queries that use no index and projections of map indexes can load a document, providing that the document is on this shard.

Projection Can load Document Conditions
Query Projection Yes The document is on this shard
Map Index Projection Yes The document is on this shard
Map-Reduce Index Projection No

OrderBy in a Map-Reduce Index

Similarly to its behavior under a non-sharded database, OrderBy is used in an index or a query to sort the retrieved dataset by a given order.

But under a sharded database, when OrderBy is used in a map-reduce index and limit is applied to restrict the number of retrieved results, there are scenarios in which all the results will still be retrieved from all shards.
To understand how this can happen, let's run a few queries over this map-reduce index:

Reduce = results =>
    from result in results
    group result by result.Name
    into g
    select new Result
    {
        // Group-by field (reduce key)
        Name = g.Key,
        // Computation field
        Sum = g.Sum(x => x.Sum)
    };
  • The first query sorts the results using OrderBy without setting any limit.
    This will load all matching results from all shards (just like this query would load all matching results from a non-sharded database).

    var queryResult = session.Query<UserMapReduce.Result, UserMapReduce>()
            .OrderBy(x => x.Name)
            .ToList();
  • The second query sorts the results by one of the GroupBy fields, Name, and sets a limit to restrict the retrieved dataset to 3 results.
    This will restrict the retrieved dataset to the set limit.

    var queryResult = session.Query<UserMapReduce.Result, UserMapReduce>()
            .OrderBy(x => x.Name)
            .Take(3) // this limit will apply while retrieving the items
            .ToList();
  • The third query sorts the results not by a GroupBy field but by a field that computes a sum from retrieved values.
    This will retrieve all the results from all shards regardless of the set limit, perform the computation over them all, and only then sort them and provide us with just the number of results we requested.

    var queryResult = session.Query<UserMapReduce.Result, UserMapReduce>()
            .OrderBy(x => x.Sum)
            .Take(3) // this limit will only apply after retrieving all items
            .ToList();

Note that retrieving all the results from all shards, either by setting no limit or by setting a limit based on a computation as demonstrated above, may cause the retrieval of a large amount of data and extend memory, CPU, and bandwidth usage.

where vs filter Recommendations

As using filter may (unless where is also used) cause the retrieval and scanning of a substantial amount of data, it is recommended to use filter cautiously and restrict its operation wherever needed.

  • Prefer where over filter when the query is executed over a GroupBy field.
  • Prefer filter over where when the query is executed over a conditional query field like Total or Sum field.
  • When using filter, set a limit if possible.
  • When filter is needed, use where first to minimize the dataset that needs to be transferred from the shards to the orchestrator and scanned by filter over the orchestrator machine. E.g. -
    from index 'Products/Sales'
    where Category = 'categories/7-A'
    filter TotalSales >= 5000

Including Items

Including items by a query or an index will work even if an included item resides on another shard.
If a requested item is not located on this shard, the orchestrator will fetch it from the shard that does host it.
Note that this process will cost an extra travel to the shard that the requested item is on.

Querying a Selected Shard

A query is normally executed over all shards.
It is, however, also possible to query only selected shards.
Query a selected shard when you know in advance that the documents you need to query reside on this shard, to avoid redundant travels to other shards.

This feature can be helpful when, for example, all the documents related to a specific account are deliberately stored on the same shard, and when it's time to query any of them the query is sent only to this shard.

  • To query a specific shard or a list of specific shards add to the query a ShardContext object that specifies the shard/s to query.
  • You can discover what shard or shards documents are stored on using ByDocumentId or ByDocumentIds.
  • Examples:

    Query only the shard containing users/1:

    var queryResult = session.Advanced.DocumentQuery<User>()
            // Which shard to query
            .ShardContext(s => s.ByDocumentId("users/1"))
            // The query
            .SelectFields<string>("Occupation").ToList();

    Query only the shard/s containing users/2 and users/3:

    var queryResult = session.Advanced.DocumentQuery<User>()
            // Which shards to query
            .ShardContext(s => s.ByDocumentIds(new[] { "users/2", "users/3" }))
            // The query
            .SelectFields<string>("Occupation").ToList();

Paging

From a client's point of view, paging is conducted similarly in sharded and non-sharded databases, and the same API is used to define page size and retrieve selected pages.

Under the hood, however, performing paging in a sharded database entails some overhead since the orchestrator is required to load the requested data from each shard and sort the retrieved results before handing the selected page to the user.

  • E.g., let's see what happens when we load the 8th page (where the page size is 100) from a non-sharded and a sharded database.

    IList<Product> results = session
        .Query<Product, Products_ByUnitsInStock>()
        .Statistics(out QueryStatistics stats) // fill query statistics
        .Where(x => x.UnitsInStock > 10)
        .Skip(700) // skip the first 7 pages (700 results)
        .Take(100) // get pages 701-800
        .ToList();
    
    long totalResults = stats.TotalResults;
    IList<Product> results = session
        .Advanced
        .DocumentQuery<Product, Products_ByUnitsInStock>()
        .Statistics(out QueryStatistics stats) // fill query statistics
        .WhereGreaterThan(x => x.UnitsInStock, 10)
        .Skip(700) // skip the first 7 pages (700 results)
        .Take(100) // get pages 701-800
        .ToList();
    
    long totalResults = stats.TotalResults;
    public class Products_ByUnitsInStock : AbstractIndexCreationTask<Product>
    {
        public Products_ByUnitsInStock()
        {
            Map = products => from product in products
                              select new
                              {
                                  UnitsInStock = product.UnitsInStock
                              };
        }
    }

    When the database is Not sharded the server would:

    • skip 7 pages.
    • hand page 8 to the client (results 701 to 800).

    When the database is Sharded the orchestrator would:

    • load 8 pages (sorted by modification order) from each shard.
    • sort the retrieved results (in a 3-shard database, for example, the orchestrator would sort 2400 results).
    • skip 7 pages (of 24).
    • hand page 8 to the client (results 701 to 800).

The shards sort the data by modification order before sending it to the orchestrator.
If a shard is required to send 800 results to the orchestrator, for example, the first result would be the document modified most recently and the document modified first would be the last result.

Timing Queries

  • The duration of queries and query parts (e.g. optimization or execution time) can be measured using API or Studio.

  • In a sharded database, the timings for each part will be provided per shard.

  • Timing is disabled by default, to avoid the measuring overhead.
    It can be enabled per query by adding include timings() to an RQL query or calling Timings() in your query code, as explained in include query timings.

  • To view the query timings in the Studio, open the Query View,
    run an RQL query with include timings(), and open the Timings tab.

"Timing Shards Querying"

Timing Shards Querying

  1. Textual view of query parts and their duration.
    Point the mouse cursor at captions to display timing properties in the graphical view on the right.
  2. Per-shard Timings
  3. Graphical View
    Point the mouse cursor at graph sections to display query parts duration:
    A. Shard #0 overall query time
    B. Shard #0 optimization period
    C. Shard #0 query period
    D. Shard #0 staleness period

Unsupported Querying Features

Querying features that are unsupported or yet unimplemented on sharded databases include:

  • Loading a document that resides on another shard
    An index or a query can only load a document if it resides on the same shard.
    If the requested document is stored on a different shard:
    • The result will be null
    • If the document resides on another shard RavenDB will not load it.
  • Loading a document within a map-reduce projection
    Read more about this topic above.
  • Streaming Map-Reduce results
    Streaming map-reduce results is not supported in a Sharded Database.
  • Using limit with PatchByQueryOperation or DeleteByQueryOperation
    Attempting to set a limit when executing PatchByQueryOperation or DeleteByQueryOperation will throw a NotSupportedInShardingException exception, specifying "Query with limit is not supported in patch / delete by query operation".
  • MoreLikeThis
    This method is not supported in a Sharded Database.