Indexes: Map-Reduce Indexes


  • Map-Reduce indexes allow complex data aggregation that can be queried on with very little cost, regardless of the data size.

  • To expedite queries and prevent performance degradation during queries, the aggregation is done during the indexing phase, not at query time.

  • Once new data enters the database, or existing documents are modified,
    the Map-Reduce index will re-calculate the aggregated data so that the aggregation results are always available and up-to-date.

  • The aggregation computation is done in two separate consecutive actions:

    • The map stage:
      This first stage runs the defined Map function(s) on each document, indexing the specified fields.
    • The reduce stage:
      This second stage groups the specified requested fields that were indexed in the Map stage,
      and then runs the Reduce function to get a final aggregation result per field value.
  • In this page:

Creating Map Reduce Indexes

When it comes to index creation, the only difference between simple indexes and the map-reduce ones is an additional reduce function defined in the index definition.
To deploy an index we need to create a definition and deploy it using one of the ways described in the creating and deploying article.


Example I - Count

Let's assume that we want to count the number of products for each category.
To do it, we can create the following index using LoadDocument inside:

class Products_ByCategory(AbstractIndexCreationTask):
    class Result:
        def __init__(self, category: str = None, count: int = None):
            self.category = category
            self.count = count

    def __init__(self):
        super().__init__()
        self.map = (
            "docs.Products.Select(product => new { "
            "    Product = Product,"
            '    CategoryName = (this.LoadDocument(product.Category, "Categories")).Name '
            "}).Select(this0 => new { "
            "    Category = this0.CategoryName, "
            "    Count = 1 "
            "})"
        )
        self.reduce = (
            "results.GroupBy(result => result.Category).Select(g => new {"
            "    Category = g.Key, "
            "    Count = Enumerable.Sum(g, x => ((int) x.Count)) "
            "})"
        )
class Products_ByCategory(AbstractJavaScriptIndexCreationTask):
    class Result:
        def __init__(self, category: str = None, count: int = None):
            self.category = category
            self.count = count

    def __init__(self):
        super().__init__()
        self.maps = {
            """
            map('products', function(p){
                return {
                    Category: load(p.Category, 'Categories').Name,
                    Count: 1
                }
            })
            """
        }

        self.reduce = """groupBy(x => x.Category)
                            .aggregate(g => {
                                return {
                                    category: g.key,
                                    count: g.values.reduce((count, val) => val.Count + count, 0)
                                };
                            })"""

and issue the query:

results = list(
    session.query_index_type(Products_ByCategory, Products_ByCategory.Result).where_equals(
        "category", "Seafood"
    )
)
from 'Products/ByCategory'
where Category == 'Seafood'

The above query will return one result for Seafood with the appropriate number of products from that category.


Example II - Average

In this example, we will count an average product price for each category.
The index definition:

class Products_Average_ByCategory(AbstractIndexCreationTask):
    class Result:
        def __init__(
            self, category: str = None, price_sum: float = None, price_average: float = None, product_count: int = None
        ):
            self.category = category
            self.price_sum = price_sum
            self.price_average = price_average
            self.product_count = product_count

    def __init__(self):
        super().__init__()
        self.map = """
                   docs.Products.Select(product => new { 
                       Product = Product, 
                       CategoryName = (this.LoadDocument(product.Category, "Categories")).Name 
                   }).Select(this0 => new { 
                       category = this0.CategoryName, 
                       price_sum = this0.Product.PricePerUnit, 
                       price_average = 0, 
                       product_count = 1 
                   })
                   """
        self.reduce = """
                      results.GroupBy(result => result.Category).Select(g => new { 
                          g = g, 
                          ProductCount = Enumerable.Sum(g, x => ((int) x.ProductCount)) 
                      }).Select(this0 => new { 
                          this0 = this0, 
                          PriceSum = Enumerable.Sum(this0.g, x0 => ((decimal) x0.PriceSum)) 
                      }).Select(this1 => new { 
                          category = this1.this0.g.Key, 
                          price_sum = this1.PriceSum, 
                          price_average = this1.PriceSum / ((decimal) this1.this0.ProductCount), 
                          product_count = this1.this0.ProductCount 
                      })
                      """
class Products_Average_ByCategory(AbstractJavaScriptIndexCreationTask):
    class Result:
        def __init__(
            self, category: str = None, price_sum: float = None, price_average: float = None, product_count: int = None
        ):
            self.category = category
            self.price_sum = price_sum
            self.price_average = price_average
            self.product_count = product_count

    def __init__(self):
        super().__init__()
        self.maps = {
            """
            map('products', function(product){
                        return {
                            Category: load(product.Category, 'Categories').Name,
                            PriceSum: product.PricePerUnit,
                            PriceAverage: 0,
                            ProductCount: 1
                        }
                    })
            """
        }

        self.reduce = """
        groupBy(x => x.Category)
                            .aggregate(g => {
                                var pricesum = g.values.reduce((sum,x) => x.PriceSum + sum,0);
                                var productcount = g.values.reduce((sum,x) => x.ProductCount + sum,0);
                                return {
                                    category: g.key,
                                    price_sum: pricesum,
                                    product_count: productcount,
                                    price_average: pricesum / productcount
                                }
                            })
        """

and the query:

results = list(
    session.query_index_type(
        Products_Average_ByCategory, Products_Average_ByCategory.Result
    ).where_equals("category", "Seafood")
)
from 'Products/Average/ByCategory'
where Category == 'Seafood'

Example III - Calculations

This example illustrates how we can put some calculations inside an index using one of the indexes available in the sample database (Product/Sales).

We want to know how many times each product was ordered and how much we earned for it.
To extract that information, we need to define the following index:

class Product_Sales(AbstractIndexCreationTask):
    class Result:
        def __init__(self, product: str = None, count: int = None, total: float = None):
            self.product = product
            self.count = count
            self.total = total

    def __init__(self):
        super().__init__()
        self.map = """
                   docs.Orders.SelectMany(order => order.Lines, (order, line) => new { 
                       Product = line.Product, 
                       Count = 1, 
                       Total = (((decimal) line.Quantity) * line.PricePerUnit) * (1M - line.Discount) 
                   })
                   """
        self.reduce = """
                      results.GroupBy(result => result.Product).Select(g => new { 
                          product = g.Key, 
                          count = Enumerable.Sum(g, x => ((int) x.Count)), 
                          total = Enumerable.Sum(g, x0 => ((decimal) x0.Total)) 
                      })
                      """
class Product_Sales(AbstractJavaScriptIndexCreationTask):
    class Result:
        def __init__(self, product: str = None, count: int = None, total: float = None):
            self.product = product
            self.count = count
            self.total = total

    def __init__(self):
        super().__init__()
        self.maps = {
            """
            map('orders', function(order){
                var res = [];
                order.Lines.forEach(l => {
                    res.push({
                        Product: l.Product,
                        Count: 1,
                        Total:  (l.Quantity * l.PricePerUnit) * (1- l.Discount)
                    })
                });
                return res;
            })
            """
        }
        self.reduce = """
        groupBy(x => x.Product)
        .aggregate(g => {
            return {
                Product : g.key,
                Count: g.values.reduce((sum, x) => x.Count + sum, 0),
                Total: g.values.reduce((sum, x) => x.Total + sum, 0)
            }
        })
        """

And run the query:

results = list(session.query_index_type(Product_Sales, Product_Sales.Result))
from 'Product/Sales'

Creating Multi-Map-Reduce Indexes

A Multi-Map-Reduce index allows aggregating (or 'reducing') data from several collections.

They can be created and edited via Studio, or with API as shown below.

In the following code sample, we want the number of companies, suppliers, and employees per city.
We define the map phase on collections 'Employees', 'Companies', and 'Suppliers'.
We then define the reduce phase.

class Cities_Details(AbstractMultiMapIndexCreationTask):
    class IndexEntry:
        def __init__(self, city: str = None, companies: int = None, employees: int = None, suppliers: int = None):
            self.city = city
            self.companies = companies
            self.employees = employees
            self.suppliers = suppliers

    def __init__(self):
        super().__init__()
        self._add_map(
            """
        from e in docs.Employees
        select new {
            city = e.Address.City,
            companies = 0,
            supplier = 0,
            employees = 1
        }
        """
        )

        self._add_map(
            """
                from c in docs.Companies
                select new {
                    city = e.Address.City,
                    companies = 1,
                    supplier = 0,
                    employees = 0
                }
                """
        )

        self._add_map(
            """
                from s in docs.Suppliers
                select new {
                    city = e.Address.City,
                    companies = 0,
                    supplier = 1,
                    employees = 0
                }
                """
        )

        self.reduce = """
        from result in results
        group result by result.city
        into g
        select new {
            city = g.Key,
            companies = g.Sum(x => x.companies),
            suppliers = g.Sum(x => x.suppliers),
            employees = g.Sum(x => x.employees)
        }
        """

A query on the index:

# Queries the index "Cities_Details" - filters "Companies" results and orders by "City".
commerce_details = list(
    session.query_index_type(Cities_Details, Cities_Details.IndexEntry)
    .where_greater_than("companies", 5)
    .order_by("city")
)

You can see this sample described in detail in Inside RavenDB - Multi-Map-Reduce Indexes.

Reduce Results as Artificial Documents

Map-Reduce Output Documents

In addition to storing the aggregation results in the index, the map-reduce index can also output those reduce results as documents to a specified collection. In order to create these documents, called "artificial", you need to define the target collection using the output_reduce_to_collection property in the index definition.

Writing map-reduce outputs into documents allows you to define additional indexes on top of them that give you the option to create recursive map-reduce operations. This makes it cheap and easy to, for example, recursively create daily, monthly, and yearly summaries on the same data.

In addition, you can also apply the usual operations on artificial documents (e.g. data subscriptions or ETL).

If the aggregation value for a given reduce key changes, we overwrite the output document. If the given reduce key no longer has a result, the output document will be removed.

Reference Documents

To help organize these output documents, the map-reduce index can also create an additional collection of artificial reference documents. These documents aggregate the output documents and store their document IDs in an array field ReduceOutputs.

The document IDs of reference documents are customized to follow some pattern. The format you give to their document ID also determines how the output documents are grouped.

Because reference documents have well known, predictable IDs, they are easier to plug into indexes and other operations, and can serve as an intermediary for the output documents whose IDs are less predictable. This allows you to chain map-reduce indexes in a recursive fashion, see Example II.

Learn more about how to configure output and reference documents in the Studio: Create Map-Reduce Index.


Artificial Document Properties

IDs

The identifiers of map reduce output documents have three components in this format:

<Output collection name>/<incrementing value>/<hash of reduce key values>

The index in Example I might generate an output document ID like this:

DailyProductSales/35/14369232530304891504

  • "DailyProductSales" is the collection name specified for the output documents.
  • The middle part is an incrementing integer assigned by the server. This number grows by some amount whenever the index definition is modified. This can be useful because when an index definition changes, there is a brief transition phase when the new output documents are being created, but the old output documents haven't been deleted yet (this phase is called "side-by-side indexing"). During this phase, the output collection contains output documents created both by the old version and the new version of the index, and they can be distinguished by this value: the new output documents will always have a higher value (by 1 or more).
  • The last part of the document ID (the unique part) is the hash of the reduce key values - in this case: hash(Product, Month).

The identifiers of reference documents follow some pattern you choose, and this pattern determines which output documents are held by a given reference document.

The index in Example I has this pattern for reference documents:

sales/daily/{Date:yyyy-MM-dd}

And this produces reference document IDs like this:

sales/daily/1998-05-06

The pattern is built using the same syntax as the StringBuilder.AppendFormat method. See here to learn about the date formatting in particular.


Metadata

Artificial documents generated by map-reduce indexes get the following @flags in their metadata:

"@flags": "Artificial, FromIndex"

These flags are used internally by the database to filter out artificial documents during replication.


Syntax

The map-reduce output documents are configured with these properties of IndexDefinition:

self._output_reduce_to_collection = output_reduce_to_collection
self._pattern_references_collection_name = pattern_references_collection_name
self._pattern_for_output_reduce_to_collection_references = pattern_for_output_reduce_to_collection_references
Parameters Type Description
_output_reduce_to_collection str Collection name for the output documents.
_pattern_references_collection_name str Optional collection name for the reference documents - by default it is OutputReduceToCollection/References
_pattern_for_output_reduce_to_collection_references str Document ID format for reference documents. This ID references the fields of the reduce function output, which determines how the output documents are aggregated. The type of this parameter is different depending on if the index is created using IndexDefinition or AbstractIndexCreationTask.

Example:

Here is a map-reduce index with output documents and reference documents:

class Product_Sales_ByMonth(AbstractIndexCreationTask):
    class Result:
        def __init__(
            self, product: str = None, month: datetime.datetime = None, count: int = None, total: float = None
        ):
            self.product = product
            self.month = month
            self.count = count
            self.total = total

    def __init__(self):
        super().__init__()
        self.map = """
                   docs.Orders.SelectMany(order => order.Lines, (order, line) => new { 
                       Product = line.Product, 
                       Month = new DateTime(order.OrderedAt.Year, order.OrderedAt.Month, 1), 
                       Count = 1, 
                       Total = (((decimal) line.Quantity) * line.PricePerUnit) * (1M - line.Discount) 
                   })
                   """

        self.reduce = """
                      results.GroupBy(result => new { 
                          Product = result.Product, 
                          Month = result.Month 
                      }).Select(g => new { 
                          product = g.Key.Product, 
                          month = g.Key.Month, 
                          count = Enumerable.Sum(g, x => ((int) x.Count)), 
                          total = Enumerable.Sum(g, x0 => ((decimal) x0.Total)) 
                      })
                      """
        self._output_reduce_to_collection = "MonthlyProductSales"
        self._pattern_references_collection_name = "DailyProductSales/References"
        self._pattern_for_output_reduce_to_collection_references = "sales/daily/{Date:yyyy-MM-dd}"
class Product_Sales_ByDate(AbstractIndexCreationTask):
    def create_index_definition(self) -> IndexDefinition:
        return IndexDefinition(
            maps={
                """
                from order in docs.Orders
                from line in order.Lines
                select new {
                    line.Product, 
                    Date = order.OrderedAt,
                    Profit = line.Quantity * line.PricePerUnit * (1 - line.Discount)
                };
                """
            },
            reduce="""
            from r in results
            group r by new { r.OrderedAt, r.Product }
            into g
            select new { 
                Product = g.Key.Product,
                Date = g.Key.Date,
                Profit = g.Sum(r => r.Profit)
            };
            """,
            output_reduce_to_collection="DailyProductSales",
            pattern_references_collection_name="DailyProductSales/References",
            pattern_for_output_reduce_to_collection_references="sales/daily/{Date:yyyy-MM-dd}",
        )

In the index example above (which inherits AbstractIndexCreationTask), the reference document ID pattern is set with the expression:

self._pattern_for_output_reduce_to_collection_references = "sales/daily/{Date:yyyy-MM-dd}"

This gives the reference documents IDs in this general format: sales/monthly/1998-05-01. The reference document with that ID contains the IDs of all the output documents from the month of May 1998.

In the JavaScript index example (which uses IndexDefinition), the reference document ID pattern is set with a string:

pattern_for_output_reduce_to_collection_references="sales/daily/{Date:yyyy-MM-dd}"

This gives the reference documents IDs in this general format: sales/daily/1998-05-06. The reference document with that ID contains the IDs of all the output documents from May 6th 1998.

Remarks

Saving documents

Artificial documents are stored immediately after the indexing transaction completes.


Recursive indexing loop

It is forbidden to output reduce results to collections such as the following:

  • A collection that the current index is already working on.
    E.g., an index on a DailyInvoices collection outputs to DailyInvoices.
  • A collection that the current index is loading a document from.
    E.g., an index with LoadDocument(id, "Invoices") outputs to Invoices.
  • Two collections, each processed by a map-reduce indexes,
    when each index outputs to the second collection.
    E.g.,
    An index on the Invoices collection outputs to the DailyInvoices collection,
    while an index on DailyInvoices outputs to Invoices.

When an attempt to create such an infinite indexing loop is detected a detailed error is generated.


Output to an Existing collection

Creating a map-reduce index which defines an output collection that already exists and contains documents, will result in an error.
Delete all documents from the target collection before creating the index, or output results to a different collection.


Modification of Artificial Documents

Artificial documents can be loaded and queried just like regular documents.
However, it is not recommended to edit artificial documents manually since any index results update would overwrite all manual modifications made in them.


Map-Reduce Indexes on a Sharded Database

On a sharded database, the behavior of map-reduce indexes is altered in in a few ways that database operators should be aware of.

  • Read here about map-reduce indexes on a sharded database.
  • Read here about querying map-reduce indexes on a sharded database.