Inside RavenDB 4.0

MapReduce and Aggregations in RavenDB

MapReduce is an old term. It came from Lisp and was used as early as the 1960s. For a long time, it was primarily known only by functional language aficionados, and it was rarely seen outside their circles. In 2004, the Google paper "MapReduce: Simplified Data Processing on Large Clusters" was released, and MapReduce was instantly a big hit in the distributed programming circles. Everyone had to have a distributed MapReduce­ implementation.

RavenDB is a distributed database, and it uses MapReduce. However, it doesn't do so in the context of distributed computing. Instead, RavenDB uses MapReduce for aggregation of data on each node independently. If you're used to MapReduce jobs that run on large clusters processing terrabytes of data, this might look very strange. But RavenDB isn't using MapReduce to break apart large computations across different machines. Instead, it uses MapReduce to break apart computations across time.

It's usually easier to explain with an example, so let's jump into that.

Executing simple aggregations

Listing 11.1 shows a simple aggregation query, giving us the total number of orders and items purchased by a particular company.

Listing 11.1 A simple aggregation query in RavenDB


from Orders as o
group by o.Company
where o.Company = 'companies/1-A'
select count() as NumberOfOrders, 
       sum(o.Lines[].Quantity) as ItemsPurchased, 
       o.Company

The query in Listing 11.1 should be familiar to anyone who's used SQL before. Now, let's analyze what RavenDB must do in order to answer this kind of query:

  1. Find all the Orders documents where the Company field is set to 'companies/1-A'.
  2. Iterate over these documents, count their number and sum the quantity of items in each.
  3. Return the results to the client.

This seems quite straightforward, but it has a couple of issues — in particular, the first two steps. The sample data we're using is a little over a thousand documents in the database. There isn't much we can do to make any query expensive over this sample dataset. However, with real-world datasets, we'll typically deal with collections that contain hundreds of thousands to many millions of documents.

Consider the first step, which is finding all the Orders documents with the Company field set to a value. If I have a few million documents to scan, that alone can be quite expensive, both in I/O (to read the data from disk) and computation (to check equality so often). This is obviously something that we can optimize with an index. It's the next step that's hard to work with.

If a company has a lot of orders, then the process of iterating over each of these orders can be extraordinarily expensive. Consider the case where you want to show the number of orders and the number of items purchased on the company page. This is a small amount of information that can tell you how important this particular customer is.

However, if we need to repeat the second step each time we run the query, that can get expensive very quickly. What's worse is that the more important the company, the more orders and items this particular customer purchased from us and the slower things will become. This method won't work. It's punishing success, and that isn't something we want.

Typically, you won't run such queries directly because of their cost and time to run. Instead, you'll write a job that will run these queries at idle times and cache the results. Then you only have to deal with cache invalidation, making sure that this job is never run under load, explaining to users that the results are delayed and so on and so forth. Suddenly, this is a big task. It's complex and will take time to complete.

But all you really wanted was to put a couple numbers on a page, and suddenly you need to deploy a background job, monitor its cost and execution and wait until the daily or weekly run to get updated numbers. Such a high cost of aggregation will usually cause such features to be dropped.

And even if you don't have enough data to require such measures, aggregation queries are still typically costly for the database — enough so that they're used sparingly in most systems.

RavenDB's aggregation doesn't work like this.

The gory details of aggregation in RavenDB

Instead of gathering all the data and then aggregating it all in place, RavenDB uses MapReduce to break apart the aggregation computation into discrete steps: the "map" and the "reduce."

Let's look at how RavenDB actually processes the query in Listing 11.1. Run the query in the listing and then click on the index button in the results, as shown in Figure 11.1.

We're going to go deep into how RavenDB is handling aggregations. You'll not typically need to know this level of detail, and you can feel free to just skip this section if you don't think you need such knowledge. I'm including it here because it's important to understand the implications of how things work and that aggregation queries in RavenDB are very cheap.

Figure 11.1 Getting to the index details on the query page

Getting to the index details on the query page

You can already see that an aggregation query in RavenDB is also using an index. In addition to the usual options, such as the index terms and performance statistics, there's also the MapReduce visualizer, which we'll look at in more detail later. For now, click on the View Index option, which should open the index details page shown in Figure 11.2.

Figure 11.2 A MapReduce index aggregation Orders by Company

A MapReduce index aggregation Orders by Company

Figure 11.2 shows the structure of the index. Operating over Orders, it groups by the Company and aggregates over them. One thing to note here is that there's no mention anywhere of companies/1-A. Even though the query in Listing 11.1 mentioned it, the index isn't operating on that particular value but rather on the generic concept of aggregating by the Company field.

In other words, as usual, RavenDB looked at the query and generalized the operation to answer any question using Company. But what about the aggregation?

Aggregation is actually handled via two separate actions, map and reduce. The first stage runs a map operation on each of the documents, grabbing just Company, sum(Lines[].Quantity) and Count = 1 from each of the Orders documents. The second stage is to group them all by Company and run the reduce operation to get the final result by each of the Company field values.

If this doesn't make sense to you, don't worry. RavenDB contains a few tools specifically to help you understand how the MapReduce process works.

In the Indexes menu item, go to Map-Reduce Visualizer and then select the Auto/Orders/ByCountAndLines[].QuantityReducedByCompany index from the dropdown. Then search for the following document IDs: orders/396-A and orders/445-A. The result should look similar to Figure 11.3.

Figure 11.3 The MapReduce visualizer allows us to inspect the internal structure of the index

The MapReduce visualizer allows us to inspect the internal structure of the index

Clicking on the {"Company":"companies/1-A"} rectangle in Figure 11.3 will give us more details about that particular value, as you can see in Figure 11.4.

Figure 11.4 A single reduce result and the total reduced value for a MapReduce index

A single reduce result and the total reduced value for a MapReduce index

With the details in Figures 11.3 and 11.4, you can now see exactly what I mean when I talk about map output and the resulting aggregation. The documents we selected (orders/396-A and orders/445-A) both belong to companies/1-A, and we can see that for orders/396-A, the map output was {"Company": "companies/1-A", "Count": 1, "Lines[].Quantity": 38}. Indeed, if we go and inspect the document, we'll see three line items, with quantities of 15, 21 and 2, totaling 38. For orders/445-A, we can see that the total quantity is 20, with a single line item.

This is interesting, but what's even more interesting is the aggregated value. For companies/1-A, you can see the aggregated values of a total of six orders for this company, with a final quantity of 174 items ordered. Clicking on the aggregation summary will take us even further down, into the individual page entries, as you can see in Figure 11.5.

Figure 11.5 Individual mapped entries for companies/1-A

Individual mapped entries for companies/1-A

In Figure 11.5, you can see all the details for each of the entries for companies/1-A. This is the lowest level we need to inspect as well as the reason why RavenDB implements aggregation in such a manner. I've already spent several pages now just explaining what's going on. Why is the aggregation implementation for RavenDB so complex?

The reason is quite simple, actually. We don't run aggregation queries once; instead, we compute the aggregation result once, and then we store it. When we issued the query in Listing 11.1, we queried only for results for companies/1-A, but the index that the query optimizer generated for us applies to all companies.

In fact, if we now run the same query but for companies/2-A, we'll be able to reuse the same index. And in fact, we'll have to do very little work. The query will use the existing index and fetch the already precomputed results for companies/2-A. It won't have to actually perform any aggregation whatsoever. All the work has already been done.

As great as that is, you might be asking yourself why there's this level of complexity. After all, surely we could have done the same thing without so many moving parts, right? That's correct, but there's one additional item that you need to consider. How are we going to handle updates?

The MapReduce indexes in RavenDB aren't simply a cache of the already computed results. Instead, we store the data in such a way that makes it cheap to also update the results. Consider what will happen inside RavenDB when a new order comes in. We'll run the map portion of the index, getting the Company, sum(Lines[].Quantity) and Count = 1 from the newly created document.

The easiest way to visualize that is to add another row to Figure 11.5. At this point, RavenDB can then just aggregate the new results alongside the already existing result and get to the final tally. In other words, the complexity here exists in order to allow RavenDB to efficiently update MapReduce results when documents are created or updated.

This works great when we have a small number of items to aggregate. Such is the case with companies/1-A. But what happens when the number of items grow? Let's increase the number of documents we're aggregating by a hundred fold and see where that takes us. Go to Documents and then Patch and run the update script in Listing 11.2.

Listing 11.2 Increase the number of Orders documents by a hundred


from Orders 
update {
    for (var i = 0; i < 100; i++ ){
        put("orders/", this);
    }
}

After running this script, we should have 83,830 Orders documents in the database (830 original Orders and ten times that number that we just entered). Where we previously had six entries for companies/1-A, we now have 606. Let's look at how that works.

Go back to the MapReduce visualizer and select the Auto/Orders/ByCountAndLines[].QuantityReducedByCompany index. Then add the orders/396-A and orders/445-A documents. The result is shown in Figure 11.6

Figure 11.6 Showing 606 mapped entries for companies/1-A

Showing 606 mapped entries for companies/1-A

This is very similar to how it looked before, and indeed, the structure of the data is exactly the same. This is because the number of entries for companies/1-A is still quite small. Let's select another pair of documents, this time belonging to companies/77-A and see what kind of structure we have there. Add orders/77-A and orders/146-A to the visualizer and take a look at the results, as shown in Figure 11.7.

Figure 11.7 MapReduce works as a tree once we have enough entries for a particular value

MapReduce works as a tree once we have enough entries for a particular value

On the left in Figure 11.7, you can see that, unlike for companies/1-A in Figure 11.3 where we had a flat list, Figure 11.7 shows a tree structure. Indeed, once we're past a certain size, RavenDB will start processing MapReduce entries in a treelike fashion.

Consider an update to orders/77-A and how RavenDB will apply it. First, we'll run the map on the updated document, giving us the map entry to write to #1028. Then, we'll run the reduce on that page, giving us the final tally for this page. We'll then recurse upward, toward #1437, where we'll also run the reduce.

The end result is that an update operation will take us a single map invocation and two reduce calls to update the final result for companies/77-A. In other words, if we have a lot of data for a particular key, RavenDB will start segmenting this data and apply aggregation operations in such a way that reduces the number of operations required to a minimum. This ensures that we're not only able to answer queries efficiently but are also able to update the MapReduce results with very little cost.

Using aggregation in RavenDB

Aggregation operations in RavenDB are cheap, both to compute and to query. This is in stark contrast to the usual behavior of aggregation queries in other databases. The more data you have to go through, the more efficiently will RavenDB be able to actually process and aggregate it.

While typically you'll run aggregation as a daily or weekly report (usually during off hours) and store these results for use later, RavenDB allows you to just query the aggregated data. RavenDB will provide the answers you need, as well as keep everything up to date.

Defining your own MapReduce indexes

We've seen from Listing 11.1 how we can query using group by and RavenDB will generate a MapReduce index for us behind the scenes. This works quite nicely, but there's only so much that you can do with a dynamic query before the query optimizer will give up.

The query optimizer can do quite a lot, but in order to make sure that it's predictable, it's currently limited to recognizing and being able to generate MapReduce indexes from a fairly small list of predictable patterns. For more complex things, you'll need to create your own MapReduce index. Luckily, this is easy to do.

Let's say we want to get the total number of sales and the total amount we made on each product. We can't express this as a simple query, so we'll need to create a MapReduce index of our own for that. Go to the Indexes page and click on New Index. Name the index Products/Sales, then click the Add Reduction button. Listing 11.3 has the contents of the Map and Reduce fields. After you're done filling these in, click Save to create the new index.

Listing 11.3 Compute total number of sales and revenue per product


// map 
from o in docs.Orders
from l in o.Lines
select new
{
    l.Product,
    l.Quantity,
    Total = (l.Quantity * l.PricePerUnit) * (1 - l.Discount)
}


// reduce
from r in results
group r by r.Product into g
select new
{
    Product = g.Key,
    Quantity = g.Sum(x => x.Quantity),
    Total = g.Sum(x => x.Total)
}

Before we get to investigating how this index works, let's talk about what it does. The map portion runs over all the orders and all the line items for each order. Then, for each of the line items, we output an entry with the quantity sold and the amount of money we made on this product. The reduce will group all these results based on the Product field and then sum up all the final numbers for the Total and Quantity sold.

Now that we've really seen map and reduce, it should be easier to grasp how they're truly two separate operations. It's also important to understand that we aren't actually running the map or the reduce on the full results all the time. Instead, we break the results apart internally and apply map and reduce to portions of the data each time. This leads to several important restrictions on MapReduce indexes:

  • Both the map and reduce functions must be pure functions. In other words, they should have no external input, and calling them with the same input must always return the same output. In particular, usage of Random or DateTime.Now and similar calls is not allowed.
  • The output of the map is fed into the reduce. This should be obvious, but what may not be obvious is that the output of the reduce is also fed into the reduce, recursively. In particular, you should make no assumptions in the reduce about the number of calls or the amount of data that you have to process in each invocation of reduce.
  • The output of the reduce must match the output of the map. (Because both are being fed back into the reduce, they must have the same structure.) RavenDB will error if you have a different shape for each of the functions.

Because of these restrictions, RavenDB can apply the map and reduce functions in an incremental fashion and generate the results that we've already seen. This is the key to RavenDB's ability to compute aggregation cheaply over time.

We can now run the query in Listing 11.4 to find the top grossing products:

Listing 11.4 Finding the top grossing products across all orders


from index 'Products/Sales'
order by Total as double desc

You'll note that the results of this query are instantaneous. Even though we have tens of thousands of records, RavenDB only needs to look through 77 precomputed results. We can also sort by Quantity to find the most popular products.

Go into the Map-Reduce Visualizer and select the Products/Sales index. Then, enter orders/6-A and orders/10-A to see the internal structure of the MapReduce index, as shown in Figure 11.8.

Figure 11.8 Internal structure of the Products/Sales index in the MapReduce visualizer

Internal structure of the Products/Sales index in the MapReduce visualizer

As you can see in Figure 11.8, all of the entries are big enough to require a tree structure. Let's take a closer look at products/39-A,with Figure 11.9 zooming in on a single page inside that tree.

Figure 11.9 A single page holding over 25,000 map entries, due to high compression rate

A single page holding over 25,000 map entries, due to high compression rate

Page #578 holds over 25,000 entries for this product. How can a single page hold so many? The answer is that RavenDB applies compression to the mapped entries. Since they're mostly similar, they have a high compression rate, allowing us to pack a lot of entries into a small amount of space. This also adds to performance of the updating entries in the tree, since we don't have to do as much work and the depth of the tree is much smaller.

This is about as low-level as we'll get when discussing the MapReduce implementation. You should now have a good feel for the relative costs of using aggregation in general — and MapReduce in particular — in RavenDB. With that knowledge under your belt, let's explore some of the more interesting things that we can do with MapReduce indexes in RavenDB.

Common pitfalls with MapReduce indexes

Certain operations are not easy to perform using the limits on the map and reduce functions that RavenDB has. Probably the most obvious example here is the problem with calculating an average. Let's edit the Products/Sales index to add an average. Listing 11.5 shows the updated definition with the most obvious (and wrong) way to do so:

Listing 11.5 Computing average (wrongly) in MapReduce index


// map 
from o in docs.Orders
from l in o.Lines
select new
{
    l.Product,
    l.Quantity,
    Count = 1,
    Total = (l.Quantity * l.PricePerUnit) * (1 - l.Discount),
    Average = 0, // so we'll have the same shape as the reduce
    Debug = new object[0] // same shape as reduce
}


// reduce
from r in results
group r by r.Product into g
select new
{
    Product = g.Key,
    Quantity = g.Sum(x => x.Quantity),
    Count = g.Sum(x => x.Count),
    Total = g.Sum(x => x.Total),
    Average = g.Average(x => x.Total),
    Debug = g.Select(x => x.Total).ToArray()
}

Save the updated index and then run the following query: from index 'Products/Sales' where Product = 'products/1-A'. The result is showed in Figure 11.10.

Figure 11.10 The wrong results shown for an average computation

The wrong results shown for an average computation

The results shown in Figure 11.10 are wrong. The average price for a box of chai (which is what products/1-A is) is not $161,449.793. So why are we getting this result?

The answer is simple. The Average call isn't complex: it sums the input elements and then divides that sum by the number of elements. But remember that map and reduce are not called on each individual value. Instead, they're called in batches. This means by the time the final call to reduce happened, it didn't get a flat list of all the results. Instead, it got an aggregated list. Figure 11.11 shows the Debug field from the index, which can allow us to better understand what's going on.

Figure 11.11 The debug field from the Products/Sales index demonstrating the recursive nature of reduce operations

The debug field from the Products/Sales index demonstrating the recursive nature of reduce operations

As you can see, the computation of the Average is correct for the input it received. Because we executed the reduce multiple times, by the time the final round came, we fed it seven results, each of them aggregating about 500 documents. In other words, when we write a MapReduce index, we need to take into account the fact that we never have access to the total result set. This make sense, when you think about it. You don't want to have a single function invocation that has to go through many tens of thousands of documents (or many more, on realistic datasets).

Don't assume that the order is fixed, either

If your MapReduce index does work that relies on the order of operations or the order of items passed to the map or reduce function, it's going to fail in a similar manner. This is common when you have calls to First or use arrays in the MapReduce index. If you care about the order of operations, you'll need to ensure the order as part of the MapReduce index and remember that the calls are made in a recursive manner.

So how can we compute an average? The answer is obvious when you know it, we need to keep track of the Count and Total as we are reducing the data and compute the Average using these. You can see it in Listing 11.6.

Listing 11.6 Properly computing average in a MapReduce index


// reduce
from r in results
group r by r.Product into g
let qty = g.Sum(x => x.Quantity)
let total = g.Sum(x => x.Total)
select new
{
    Product = g.Key,
    Quantity = qty,
    Count = g.Sum(x => x.Count),
    Total = total,
    Average = total / qty
}

All we need to do for the proper computation is make sure we aren't assuming the total dataset is passed to the reduce call. In this case, we already have all the data we need, and we can just compute the final average. Note that we'll still need to have a field called Average in the map function because the shape must match. But since we're never reading this field, we don't have to give it any special value and can just initialize it to any value we want.

Similarly to Average, we can't use Count() in the reduce either, for the exact same reason. That's why we use Count = 1 in the map and sum it in the reduce: to allow us to start the recursive computation of the total amount of items.

Complex MapReduce indexes

So far, the MapReduce indexes we've built were pretty simple, mostly doing trivial aggregation on numbers. To be fair, that's usually what you'll do with MapReduce indexes. But you can also do more with them.

A good example would be to compute the total sales per company. This sounds like just taking the Products/Sales index and changing what we're grouping on, so let's make this more interesting. What we want to do is to get, per company, the total sales per product.

Listing 11.7 shows the Companies/Purchases index, which does just that.

Listing 11.7 Computing the total products sold for each company


//map
from o in docs.Orders
from l in o.Lines
select new
{
    o.Company,
    Products = new [] { new { l.Product, l.Quantity } },
    Total = l.Quantity
}

//reduce
from r in results
group r by r.Company into g
select new
{
    Company = g.Key,
    Products = g.SelectMany(x => x.Products)
        .GroupBy(x => x.Product)
        .Select(p => new
        {
            Product = p.Key,
            Quantity = p.Sum(x => x.Quantity)
        }),
    Total = g.Sum(x => x.Total)
}

Looking at Listing 11.7, we can see that the map function looks pretty normal, but there's a considerable amount of work going on in the reduce. Let's break it apart into its components and examine each independently.

First, the map generates an entry per line item, and the entry contains a single element Products array with the product's ID and the quantity sold. The reason we create the array in the map is that we need to match the shape of the reduce. In the reduce function, we group the results by the Company and then generate the results for that particular company. The most interesting tidbit happens when building the Products field.

There, we apply another GroupBy call to aggregate the data inside the Company once again. We use this to get the total numbers of items purchased for each product, and this mechanism allows us to aggregate the data across multiple invocations to reduce safely and in a natural manner.

Now, let's see what this index actually generates. Execute the following query: from index 'Companies/Purchases' where Company = 'companies/13-A' and observe the results. You can also see this in Figure 11.12.

Figure 11.12 Complex output from the index, showing aggregation inside the reduced result

Complex output from the index, showing aggregation inside the reduced result

This kind of index can be used to prepare complex computations ahead of time, leaning on RavenDB's ability to compute this beforehand. RavenDB will then be in charge of keeping such details up to date, and your client code can simply query the output of the index.

Reduce should...reduce the size of the data

A somewhat overlooked requirement for the reduce function is that the amount of data going out of the reduce function should be smaller than the amount of data going in. All the examples we've seen so far have done this properly, mostly because they aggregated information.

Let's look at an example where the amount of information that passed through reduce doesn't go down and discuss its implications. In the Companies/Purchases index, if the Products field was defined just as Products = g.SelectMany(x => x.Products), we would have a problem. Calling reduce won't actually reduce the amount of data we're working with and, as time goes by, the reduce function will have to operate on larger and larger values.

This isn't an efficient way to do things, and since we need to keep track of intermediate values, it will lead to a marked increased in memory utilization and disk space usage, and it will overall be quite expensive.

Querying MapReduce indexes

When talking about MapReduce indexes, we typically focus on the Reduce portion of them — the aggregation and computation being done. This, however, is only part of the story. While a lot of stuff is going on in the Reduce portion of the index, in the end, the data is still being written to the index in exactly the same way as we've seen in the previous chapter.

This means that you can execute any of the usual operations you would normally run. As a good example, let's look at Figure 11.12. The output of the query is pretty bare bones: what is products/21-A, for example. Listing 11.8 shows a nice way to get more information at very little cost.

Listing 11.8 Using functions to enrich the results from a MapReduce index


declare function addProductName(result) {
    for (var i = 0; i < result.Products.length; i++) {
        var p = load(result.Products[i].Product);
        result.Products[i].ProductName = p.Name;
    }
    return result;
}
from index 'Companies/Purchases' as result
where result.Company = 'companies/13-A'
select addProductName(result)

The result of the query in Listing 11.8 is shown in Listing 11.9.

Listing 11.9 Output of the query in Listing 11.8, showing enriched result set of MapReduce index


{
    "Company": "companies/13-A",
    "Products": [
        {
            "Product": "products/21-A",
            "Quantity": 1010,
            "ProductName": "Sir Rodney's Scones"
        },
        {
            "Product": "products/37-A",
            "Quantity": 101,
            "ProductName": "Gravad lax"
        }
    ],
    "Total": 1111
}

The query in Listing 11.8 should serve as a nice example of the kind of things we can do, but it's just the first taste. In addition to using projections in this manner, you can also define MapReduce fields to support full text search, apply suggestions and "more like this."

You can even query using facets on MapReduce indexes, an example of which you can see in Listing 11.10.

Listing 11.10 Facets also apply to MapReduce indexes


from index 'Companies/Purchases'
select facet(
    Total < 5000, 
    Total between 5_000 and 9_999,
    Total between 10_000 and 24_999,
    Total between 25_000 and 49_999,
    Total between 50_000 and 74_999,
    Total > 75_000,
    avg(Total)
    )

Applying facets, which can do their own aggregation, to MapReduce is an interesting experience. It allows us to do things like build a MapReduce operation to aggregate some of the data in a static fashion, then allow faceted queries to slice and dice it further. If you look at the results of the query in Listing 11.10, you'll note that the faceted results also include an average value for each facet, and we have support for the usual (min, max, sum, avg) aggregation methods.

In fact, that's not the only way to aggregate the data in multiple ways. There's also recursive MapReduce operations.

Recursive MapReduce

The sample dataset that we used consists of orders representing sales to companies. This is the typical e-commerce model and should be pretty familiar to you. A common requirement in such a system is reporting — in particular, pulling sales data on a daily, monthly and yearly basis.

Listing 11.11 shows the Products/DailySales index, which gives us the details of sales by product by day.

Listing 11.11 Computing total daily sales by product for all orders


//map 
from o in docs.Orders
from l in o.Lines
select new 
{
    o.OrderedAt.Date,
    l.Product,
    Count = l.Quantity
}

//reduce
from r in results
group r by new { r.Date, r.Product } into g
select new
{
    g.Key.Date,
    g.Key.Product,
    Count = g.Sum(x => x.Count)
}

The output of the Products/DailySales index can be seen in Figure 11.13. The only new thing we have in Listing 11.11 is the group by on multiple fields, the Date and the Product.

Figure 11.13 Showing the daily sales for each product on May 6th, 1998

Showing the daily sales for each product on May 6th, 1998

Next, we need to compute the same values per month and then per year. We can define the same index again and run the exact same computation but this time group by the product, year and month and then just by the product and year. But we currently have 83,830 orders in the database, and we'll likely have more. Doing the same operation on all these values again and again seems...inefficient.

RavenDB supports the notion of outputting a MapReduce output from an index to a dedicated collection. Set the Output reduce to collection to DailyProductSales. Figure 11.14 shows how this can be done.

Figure 11.14 Configuring MapReduce index to write the output of the index to a dedicated collection

Configuring MapReduce index to write the output of the index to a dedicated collection

Make the modification to the Products/DailySales index and save it. Then head to the Documents tab and look at the collections in the database. You can see the results in Figure 11.15.

Figure 11.15 An artificial document created as a result of a MapReduce's output collection

An artificial document created as a result of a MapReduce's output collection

In many respects, these artificial documents will behave just like standard documents. You can load them, query them and even save modifications to them. Note that modifying an artificial document by hand is not recommended, as the next update to the index will overwrite any changes you have made to the document, after all. Artificial documents are not replicated. Since they're being created by the index directly, they'll also be created in the remote node, so there's no point in sending them over the wire.

Artificial documents are updated whenever the index completes a batch of documents, so there's very little lag time between the index picking up changes and the artificial documents' update. If this were the only thing artificial documents were good for, that wouldn't be of much use. After all, we already have the results as the output of the MapReduce index. So why do we need artificial documents?

The primary reason that artificial documents exist is so you can set up indexes on top of them. And that includes additional MapReduce indexes. Take a look at Listing 11.12, showing just such an example of the Products/MonthlySales index, computing the monthly totals from the daily totals.

Listing 11.12 Recursive MapReduce index using artificial documents


//map 
from ds in docs.DailyProductSales
select new 
{
    ds.Product,
    Year =  ds.Date.Year,
    Month = ds.Date.Month,
    ds.Count
}

//reduce
from result in results
group result by new 
{ 
    result.Product, 
    result.Year, 
    result.Month 
}
into g
select new
{
    g.Key.Year,
    g.Key.Month,
    g.Key.Product,
    Count = g.Sum(x => x.Count)
}

You can see that the map of the Products/MonthlySales index is using the DailyProductSales collection as it source, and the reduce aggregates the data by product on a monthly basis. Let's examine what's actually going on here in a little more depth.

Whenever an order is created or modified, the Products/DailySales index will run, computing the updated daily totals for the products in the new order. As a result of this index running, artificial documents will be created (with IDs such as DailyProductSales/3183876884104958208).

Because a DailyProductSales document was created, the Products/MonthlySales index will run on the changed daily tallies to update its own numbers. We can even set things up so we'll have an output collection for the MonthlyProductsSales as well, and then define a Products/YearlySales. The recursive nature of the indexing naturally extends in this manner.

Artificial documents do have a few limitations that you should be aware of:

  • RavenDB will detect and generate an error if you have a cycle of artificial documents. In other words, you can't define another index that will output artificial documents if that will trigger (directly or indirectly) the same index. Otherwise, you might set up a situation where the indexes run in an infinite loop.
  • You must choose an empty collection. RavenDB will not allow you to output artificial documents into a preexisting collection. This is done because RavenDB will overwrite any document in the collection, so it prevents the option of overwriting existing documents.
  • The document identifiers for the artificial documents are generated by RavenDB (based on the hash of the reduce key), and you don't have any control over them.
  • Artificial documents are not sent over replication and cannot use revisions or attachments.

Artificial documents and subscriptions

You can use subscriptions and artificial documents together. (In fact, along with recursive MapReduce, that's one of the primary reasons they exist.) But you need to be aware of a small wrinkle in this setup. Because artificial documents aren't sent via replication, each node in the database group is going to have its own (independent) copy of the results. The contents are the same, but the subscription has no way of knowing this.

Because of this issue, it's recommended to use artificial documents with subscriptions only on a single node. Failover of the subscription to another node may cause the subscription to send artificial documents that the subscription has already acknowledged. You can configure this by disabling the dynamic task distribution as part of the subscription configuration.

Artificial documents and recursive MapReduce are a good match, but before you turn your RavenDB instances into a reporting powerhouse, there's another possible topology to consider. Instead of defining these indexes and processes on the main database, you can set up external replication to a dedicated database (on the same cluster or on a separate one) and run all that work there.

This can simplify distribution of work as the application grows. If you have many such indexes and a high rate of changes, being able to isolate the work to a particular database group (and thus specific nodes) can be very helpful.

MultimapReduce indexes

In the previous chapter, Listing 10.12 demonstrated the usage of multimap indexes to index several collections and unite the results into a single index. This can be great for when you want to search over several things at the same time. The example we looked at in Chapter 10 searched for a person by name, where the person can be an Employee, a Contact on a Company or a Contact for a Supplier. Regardless, we were able to search for that person easily and in a convenient manner.

MultimapReduce indexes allow us to extend that behavior to also include aggregation across multiple sources in a natural way. Let's first look at Listing 11.13, showing off this feature, and then explore what this means.

Go to the Indexes page and create a new index. Click on Add map twice and create the three maps. Then click Add Reduction and add the reduce. Name the new index Cities/Details and click Save.

Listing 11.13 MultimapReduce index that sums the points of interest in each city


// map #1
from c in docs.Companies
select new 
{
    c.Address.City,
    Companies = 1,
    Suppliers = 0,
    Employees = 0
}


// map #2
from s in docs.Suppliers
select new 
{
    s.Address.City,
    Companies = 0,
    Suppliers = 1,
    Employees = 0
}

// map 3
from e in docs.Employees
select new{
    e.Address.City,
    Companies = 0,
    Suppliers = 0,
    Employees = 1
}

//reduce
from result in results
group result by result.City 
into g
select new 
{
    City = g.Key,
    Companies = g.Sum(x => x.Companies),
    Suppliers = g.Sum(x => x.Suppliers),
    Employees = g.Sum(x => x.Employees),
}

Take a few minutes to look at Listing 11.13. There's a lot of stuff going on there. We define three maps, on the Companies, Suppliers and Employees. And for each, we output a count for the type of the document we're mapping, as well as the relevant City. Finally, on the reduce, we simply group by City and then sum up all the results from all the intermediate steps to get the final tally. Listing 11.14 shows the output from this index for London.

Listing 11.14 Output of the 'Cities/Details' index for London


{
    "City": "London",
    "Companies": 6,
    "Suppliers": 1,
    "Employees": 4
}

The Cities/Details index is interesting, in the sense that it shows off capabilities, but it isn't really that exciting. It also operates on a small dataset, even if it touches multiple collections. Let's modify the index by having it operate also on orders.

We'll first add another entry for each of the existing maps: OrderTotal = 0. And we'll add the same logic for the reduce function: OrderTotal = g.Sum(x => x.OrderTotal). The first step is required because all the maps and reduces in an index must have the same output. The second is required to actually sum up the information we'll shortly add. Now click on the Add map button on the index edit page and add the map shown in Listing 11.15.

Listing 11.15 Adding total order per city to the 'Cities/Details' index


from o in docs.Orders
select new
{
    o.ShipTo.City,
    Companies = 0,
    Suppliers = 0,
    Employees = 0,
    OrderTotal =  o.Lines.Sum( 
        x => x.Quantity * x.PricePerUnit 
    )
}

We now have an index that can give us a lot of interesting details about each city. Listing 11.16 shows the updated output for London in the database.

Listing 11.16 London's information now includes the total revenue for orders shipped to it


{
    "City": "London",
    "Companies": 6,
    "Suppliers": 1,
    "Employees": 4,
    "OrderTotal": 4107034.71
}

As you can imagine, this kind of behavior is powerful because it allows you to pull data from disparate parts of your system and aggregate it with very little work. Querying this information is effectively free, so that also makes it much easier to consume and work with.

MapReduce indexes are also just indexes

It bears repeating that a MapReduce index is also just an index. We focused heavily in this chapter on the Reduce portion of such indexes, but it's important to remember that they're also capable of doing everything that a map-only index can do.

In other words, if we had a Cities collection that also had the coordinates of each city, we would be able to modify the Cities/Details index to be a MultimapReduce index that also provides spatial queries. You can use facets on a MapReduce index, apply suggestions, run full text queries, etc. All too often, I see users assume the Reduce part of the index is where the functionality stops. In fact, this is where it begins.

Dynamic aggregation with MapReduce indexes

MapReduce indexes are a wonderful way to handle aggregation. RavenDB's ability to precompute and answer complex aggregation cheaply can make a world of difference in your ability to deliver features in your applications. The fact that showing an aggregated value doesn't require you to set up an off-hours job, monitor it, clear caches, etc. is a force multiplier for your application's capabilities.

However, while the MapReduce operation can seem magical at times, it's a tool fit for a purpose. Trying to use it for a different purpose than it's intended for will produce suboptimal results. MapReduce indexes are great when

  • The fields you're aggregating by are known in advance.
  • The source data you're aggregating is known in advance.

In other words, MapReduce is great for static aggregation. If you want to get the daily totals by product, MapReduce is the perfect solution. The data source (all orders) is known. What you're aggregating by (the date and the product) is known in advance, too. RavenDB is able to generate the appropriate MapReduce operation, compute the result and allow you to query it easily and cheaply.

But what happens if we want to aggregate daily sales by product only for London? Well, that's easy. We can define another MapReduce index that aggregates the results by date, product and city. We get the same benefits, and everyone is happy. But the next request is to get the daily sales by city based on the supplier, not the product. And the next request after that is to aggregate the sales by the employee on a monthly basis, and the one after that is to see the yearly sales by product only for specific customers, and then...

I think you get the picture. MapReduce indexes are great when the type of aggregation is known in advance. In this case, RavenDB is able to prepare everything and have an answer ready for you by the time you query. But if your queries are dynamic and fluid, and if you're changing what you're querying on and how you're doing the aggregation, this is much more complex.

One option would be to define a MapReduce index for each type of aggregation you need. This works, but you might end up with a lot of MapReduce indexes. That can be fine. RavenDB is very efficient in the handling of MapReduce indexes (they're typically cheaper to run than a map-only index, actually), but a large number of indexes still means that we need to execute some amount of work times the number of indexes for each change.

A better approach for dynamic aggregation is to use facets. To see the total sales for egg noodles and cheese in the first week of July 1996, we can run the query in Listing 11.17.

Listing 11.17 Using facets on top of a MapReduce index to achieve dynamic aggregation


from index 'Products/DailySales'
where Date between '1996-07-01' and '1996-07-07' 
and   Product in ('products/11-A', 'products/42-A')
select facet(Product, sum(Count))

Let's break the query in Listing 11.17 apart, one clause at a time. First, we select the MapReduce index Products/DailySales, which we've already seen in Listing 11.11. The where clause specifies the date range we want to query and the particular products we're interested in. Note that the Date query is using between and relies on the fact that we do lexical comparisons to get a clear syntax for the date range.

The select clause is using facets, but unlike the facets queries we looked at in the previous chapter, we're now adding a new wrinkle in the form of sum(Count). This allows us to do an aggregation operation over the query results. In this way, we can dynamically control what will be aggregated.

Cost analysis for dynamic aggregation (facets) vs. static aggregation (MapReduce)

An important distinction needs to be made about the cost structures of using aggregation in MapReduce vs. facets. For MapReduce, by the time you're querying the information, it's already been aggregated. You're actually doing a search on top of the already precomputed results.

Aggregation with facets (which is sometimes also called dynamic aggregation) requires us to run over the results of the query and compute the final aggregated values. Note that the amount of results the faceted query needs to aggregate is just the query results, not the total size of the source data.

This can be confusing, so we'll use the example of the Products/DailySales index and the faceted query in Listing 11.17 to clear things up. We have the following values:

  • D — the total number of orders. In this database, after running the patch operation in Listing 11.2, the number is 83,830 documents.
  • R — the unique reduce keys after the MapReduce aggregation. In the case of the Products/DailySales index, that's the unique (date, product) pairs that we're grouping by. This value is 2,106 unique reduce keys for the dataset we have.
  • Q — the number of results matched in the query. For the query in Listing 11.7 (sans facets), that number is two.

With these values, we can now give proper estimates of the costs of making various queries. A typical cost of a MapReduce query is O(log R). So the cost for a query on Products/DailySales would be about 11 operations.

A faceted query on all orders will have a cost of O(D), with D equal to 83,830. This computation is done each time the query is run. However, the only reason the cost is O(D) is that we queried over all orders. Because the number of results for the query in Listing 11.7 is two, the cost of actually aggregating them is effectively nil.

It's a great way to handle such scenarios: use MapReduce to do the first level of the aggregation, and then use dynamic aggregation using facets to further slice and dice the data as you need.

The result of the faceted query is affected by the matches for the query, and that can change dynamically with very little cost, opening the way to more dynamic aggregation queries. Dynamic aggregation also has a pretty straightforward cost, since it varies linearly with the amount of matches for the query.

That's a great thing since it simplifies dynamic queries. But it also means that if you want to run a faceted query with aggregation on a very large result set, it's going to take time to process. A MapReduce on the same (or much larger) amount of data will be much faster at querying time, but it's limited in the amount of flexibility it allows for each query.

Combining MapReduce and facets to handle this is a great way to reduce1 the amount of data the facets need to go through. It's easiest to consider this kind of approach as feeding the facets baby food, already pre-chewed. That dramatically cuts down the amount of effort required to get the final result.

Multi-step aggregation processes

We've looked at all sorts of MapReduce indexes in this chapter, and we also looked at how we can use dynamic aggregation to build dynamic aggregated queries. Now I want to apply all that knowledge to a more complex example, utilizing many of the features we explored in this chapter.

Here's the requirement: we want to be able to get a monthly report of sales by supplier per city. It might be easier to understand if we start from the point of view of the user interface. Take a look at Figure 11.16, which shows a mockup of the search interface and the results we want to see.

Figure 11.16 Mockup of the user interface for the kind of data we want

Mockup of the user interface for the kind of data we want

The simplest way to be able to answer this query is to use LoadDocument in the map phase of the index. You can see how this is done in Listing 11.18.

Listing 11.18 MapReduce index using LoadDocument can pull data from related documents


//map
from o in docs.Orders
from l in o.Lines
let product = LoadDocument(l.Product, "Products")
select new
{
    o.ShipTo.City,
    o.ShippedAt.Month,
    o.ShippedAt.Year,
    Supplier =  product.Supplier,
    Total = l.PricePerUnit * l.Quantity
}
// reduce
from result in results
group result by new 
{ 
    result.City, 
    result.Supplier,
    result.Month,
    result.Year
} 
into g
select new 
{
    g.Key.City,
    g.Key.Supplier,
    g.Key.Month,
    g.Key.Year,
    Total = g.Sum(x => x.Total)
}

The interesting bit in Listing 11.18 is the LoadDocument call in the map. Everything else is pretty much the same as we've done throughout this chapter. We looked at LoadDocument in the previous chapter, and it serves mostly the same role in MapReduce indexes as well. If the product document has changed, we'll have to re-index all the documents that referenced it. Because of this, the LoadDocument option is only available during the map phase of a MapReduce index. You can't call LoadDocument from the reduce.

A sample query for this index can be seen in Listing 11.19.

Listing 11.19 This query can be used to render the results in Figure 11.16.


from index 'Sales/ByCityAndSupplier' as t
where t.City = 'London' and t.Month = 2 and t.Year = 1997
order by t.Total as double desc
load t.Supplier as s
select s.Name, t.City, t.Supplier , t.Month, t.Year, t.Total

This works. It generates the results you see in Figure 11.16. However, there's a problem here. LoadDocument requires that RavenDB updates the index if the referenced document (the product's document, in this case) is updated. This means that an update to a product can force RavenDB to re-index all the orders that have this product. If it's a popular product, this may require RavenDB to re-index a large number of orders.

Ideally, we want to have LoadDocument where the number of referencing documents is bounded and small. Is there a better way to handle this? We can change the index in Listing 11.18 so it doesn't include the Supplier, and instead groups things only by (product, city, month, year). We'll also define an output collection for the results. The output collection is then used in another index where we'll use a LoadDocument to achieve the same output.

Why all of this complexity? Aren't we in exactly the same position as we were before? An update to a product document will force us to re-index, after all. And the answer is yes, it will force us to re-index, but the question is what. In the case of Listing 11.18, any change to a product will force re-indexing of all the Orders documents that referenced it. But in the case where we had an intermediate artificial documents collection, we'll only need to re-index those that referenced the modified product.

Those documents have already gone through a MapReduce process, and there are likely to be far fewer of them than there are orders, so this is a net win in terms of the total amount of work that has to be done.

Summary

This chapter covered a lot of ground. I tried to strike the right balance between giving you enough information about what's actually going on behind the scenes and drowning everything in implementation details.

The key takeaway I hope you'll get from this chapter is that aggregation in RavenDB is cheap and plentiful, so have at it. This is in direct contrast to the way things usually work with other databases. Aggregation is often expensive and hard, so it gets pushed to dedicated solutions (nightly runs, reporting databases, cached queries, etc.). None of this is needed with RavenDB.

We started this chapter by writing some simple RQL queries using group by, and it just worked. The query optimizer recognized the query and generated the appropriate MapReduce index, and we were off to the races. We then took a deep, hard look at what was actually going on there, analyzing what the index was doing and how MapReduce works inside RavenDB.

In order to better understand that, RavenDB has a few tools built in to help you. Chief among them is the MapReduce visualizer, which lets you peek deep into the heart of how RavenDB executes MapReduce operations. We looked at the different behaviors that happen when we have a small amount of items to aggregate for a particular reduce key (all items are reduced in a single group) and when we have a very large amount of items (reduce is handled in a recursive tree fashion).

This means that updates to MapReduce indexes are very fast because we usually need to do a minimal amount of work to get the updated results. After learning how RavenDB processes MapReduce indexes, we learned how we can define our own, with complete freedom of how we want to structure the data and aggregate it.

We also learned there are some things to remember when you build your indexes, such as making sure that the map and reduce functions are pure, that your reduce function can handle being called recursively and that the reduce outputs less data than we put in.

We then upped the ante and looked at more complex MapReduce indexes, grouping all the results by company. Inside each company, we grouped things again by product to get a detailed summary of the results. These kinds of indexes can allow you to do sophisticated aggregation, rollups and computation during the indexing. Then they expose the whole thing, readily consumable, to your application.

MapReduce indexes can do amazing things, but they're also indexes in RavenDB. That means that, in addition to whatever you're doing in the MapReduce operation, all the usual stuff you can do with indexes is also there. We looked at how we can use JavaScript projections to enrich the MapReduce output during query, use facets on the data (including additional aggregation) and the many more options (full text search, spatial, etc.).

If a single MapReduce index isn't enough, you can always try things recursively. Indeed, RavenDB allows MapReduce indexes to output artificial documents back to the database, and these documents are a key part of allowing RavenDB to handle recursive MapReduce indexes. We looked at how we can do a daily and monthly rollup of sales per product by creating a MapReduce index that's fed off the artificial documents collection that's created by another MapReduce index. This opens up the option to do some really cool things because you can pipeline the work and take advantage of what's already been done.

Another cool option we looked at was the ability to use MultimapReduce indexes. Instead of just aggregating data from a single collection, we can aggregate the data from several of them at once. That allows you to, with very little effort, paint a picture of what's actually going on in your database and gain insight into what you're doing. We saw a good example of that when we looked at what was going on in each city and were able to tell how many companies, employees, suppliers and sales we had in each location.

MapReduce is fantastic when you have a static aggregation scenario — when you know what you're aggregating ahead of time. Otherwise, you'll need to (either directly or via the query optimizer) generate MapReduce indexes for each permutation that you want to query. Another alternative to that is the notion of dynamic aggregation, using facets to slice and dice the information dynamically. This tends to be less efficient than a MapReduce index, but it has less upfront costs.

It's common to do that in two stages: first defining the rough shape of the aggregation in a MapReduce index, efficiently doing the aggregation, and then using facets and dynamic aggregation on that much smaller result set to narrow down things more accurately. This gives you both fast aggregations and more flexibility in the queries.

We finished the chapter by looking at aggregating documents by a related document, using LoadDocument in a MapReduce index. This works, and it can be quite an elegant solution for some scenarios. But it's also possible to get into trouble with this approach because an update to a referenced document requires re-indexing of all the referencing documents. Instead, we can utilize artificial documents and two MapReduce indexes to reduce the amount of work required when we need to re-index.

The MapReduce engine inside RavenDB is very flexible, and it's been used to great effect over the years — including in many scenarios that surprised RavenDB's own development team. Aggregation of results is only the most obvious of the options that are available to you.

In the next chapter, we'll switch gears a bit and move to a more practical mindset, talking about how to utilize and work with indexes in your applications.


  1. Pun very much intended here.