MapReduce and Aggregations in RavenDB
MapReduce is an old term. It came from Lisp and was used as early as the 1960s. For a long time, it was primarily known only by functional language aficionados, and it was rarely seen outside their circles. In 2004, the Google paper "MapReduce: Simplified Data Processing on Large Clusters" was released, and MapReduce was instantly a big hit in the distributed programming circles. Everyone had to have a distributed MapReduce implementation.
RavenDB is a distributed database, and it uses MapReduce. However, it doesn't do so in the context of distributed computing. Instead, RavenDB uses MapReduce for aggregation of data on each node independently. If you're used to MapReduce jobs that run on large clusters processing terrabytes of data, this might look very strange. But RavenDB isn't using MapReduce to break apart large computations across different machines. Instead, it uses MapReduce to break apart computations across time.
It's usually easier to explain with an example, so let's jump into that.
Executing simple aggregations
Listing 11.1 shows a simple aggregation query, giving us the total number of orders and items purchased by a particular company.
Listing 11.1 A simple aggregation query in RavenDB
from Orders as o
group by o.Company
where o.Company = 'companies/1-A'
select count() as NumberOfOrders,
sum(o.Lines[].Quantity) as ItemsPurchased,
o.Company
The query in Listing 11.1 should be familiar to anyone who's used SQL before. Now, let's analyze what RavenDB must do in order to answer this kind of query:
- Find all the
Orders
documents where theCompany
field is set to'companies/1-A'
. - Iterate over these documents, count their number and sum the quantity of items in each.
- Return the results to the client.
This seems quite straightforward, but it has a couple of issues — in particular, the first two steps. The sample data we're using is a little over a thousand documents in the database. There isn't much we can do to make any query expensive over this sample dataset. However, with real-world datasets, we'll typically deal with collections that contain hundreds of thousands to many millions of documents.
Consider the first step, which is finding all the Orders
documents with the Company
field set to a value. If I have a few million
documents to scan, that alone can be quite expensive, both in I/O (to read the data from disk) and computation (to check equality
so often). This is obviously something that we can optimize with an index. It's the next step that's hard to work with.
If a company has a lot of orders, then the process of iterating over each of these orders can be extraordinarily expensive. Consider the case where you want to show the number of orders and the number of items purchased on the company page. This is a small amount of information that can tell you how important this particular customer is.
However, if we need to repeat the second step each time we run the query, that can get expensive very quickly. What's worse is that the more important the company, the more orders and items this particular customer purchased from us and the slower things will become. This method won't work. It's punishing success, and that isn't something we want.
Typically, you won't run such queries directly because of their cost and time to run. Instead, you'll write a job that will run these queries at idle times and cache the results. Then you only have to deal with cache invalidation, making sure that this job is never run under load, explaining to users that the results are delayed and so on and so forth. Suddenly, this is a big task. It's complex and will take time to complete.
But all you really wanted was to put a couple numbers on a page, and suddenly you need to deploy a background job, monitor its cost and execution and wait until the daily or weekly run to get updated numbers. Such a high cost of aggregation will usually cause such features to be dropped.
And even if you don't have enough data to require such measures, aggregation queries are still typically costly for the database — enough so that they're used sparingly in most systems.
RavenDB's aggregation doesn't work like this.
The gory details of aggregation in RavenDB
Instead of gathering all the data and then aggregating it all in place, RavenDB uses MapReduce to break apart the aggregation computation into discrete steps: the "map" and the "reduce."
Let's look at how RavenDB actually processes the query in Listing 11.1. Run the query in the listing and then click on the index button in the results, as shown in Figure 11.1.
We're going to go deep into how RavenDB is handling aggregations. You'll not typically need to know this level of detail, and you can feel free to just skip this section if you don't think you need such knowledge. I'm including it here because it's important to understand the implications of how things work and that aggregation queries in RavenDB are very cheap.
You can already see that an aggregation query in RavenDB is also using an index. In addition to the usual options, such as the
index terms and performance statistics, there's also the MapReduce visualizer, which we'll look at in more detail later. For now,
click on the View Index
option, which should open the index details page shown in Figure 11.2.
Figure 11.2 shows the structure of the index. Operating over Orders
, it groups by the Company
and aggregates over them. One thing to note here is that there's no mention anywhere of companies/1-A
. Even though the query in
Listing 11.1 mentioned it, the index isn't operating on that particular value but rather on the generic concept of aggregating by
the Company
field.
In other words, as usual, RavenDB looked at the query and generalized the operation to answer any question using Company
. But what about the aggregation?
Aggregation is actually handled via two separate actions, map
and reduce
. The first stage runs a map
operation on
each of the documents, grabbing just Company
, sum(Lines[].Quantity)
and Count = 1
from each of the Orders
documents. The
second stage is to group them all by Company
and run the reduce
operation to get the final result by each of the
Company
field values.
If this doesn't make sense to you, don't worry. RavenDB contains a few tools specifically to help you understand how the
MapReduce
process works.
In the Indexes
menu item, go to Map-Reduce Visualizer
and then select the Auto/Orders/ByCountAndLines[].QuantityReducedByCompany
index from the dropdown. Then search for the following document IDs: orders/396-A
and orders/445-A
. The result should look
similar to Figure 11.3.
Clicking on the {"Company":"companies/1-A"}
rectangle in Figure 11.3 will give us more details about that particular value, as
you can see in Figure 11.4.
With the details in Figures 11.3 and 11.4, you can now see exactly what I mean when I talk about map
output and the resulting
aggregation. The documents we selected (orders/396-A
and orders/445-A
) both belong to companies/1-A
, and we can see that
for orders/396-A
, the map
output was {"Company": "companies/1-A", "Count": 1, "Lines[].Quantity": 38}
. Indeed, if we go
and inspect the document, we'll see three line items, with quantities of 15, 21 and 2, totaling 38. For orders/445-A
, we can
see that the total quantity is 20, with a single line item.
This is interesting, but what's even more interesting is the aggregated value. For companies/1-A
, you can see the aggregated
values of a total of six orders for this company, with a final quantity of 174 items ordered. Clicking on the aggregation summary
will take us even further down, into the individual page entries, as you can see in Figure 11.5.
In Figure 11.5, you can see all the details for each of the entries for companies/1-A
. This is the lowest level we need to
inspect as well as the reason why RavenDB implements aggregation in such a manner. I've already spent several pages now just explaining
what's going on. Why is the aggregation implementation for RavenDB so complex?
The reason is quite simple, actually. We don't run aggregation queries once; instead, we
compute the aggregation result once, and then we store it. When we issued the query in Listing 11.1, we queried only for results
for companies/1-A
, but the index that the query optimizer generated for us applies to all companies.
In fact, if we now run the same query but for companies/2-A
, we'll be able to reuse the same index. And in fact, we'll
have to do very little work. The query will use the existing index and fetch the already precomputed results for companies/2-A
. It won't have to actually perform any aggregation whatsoever. All the work has already been done.
As great as that is, you might be asking yourself why there's this level of complexity. After all, surely we could have done the same thing without so many moving parts, right? That's correct, but there's one additional item that you need to consider. How are we going to handle updates?
The MapReduce indexes in RavenDB aren't simply a cache of the already computed results. Instead, we store the data in such a
way that makes it cheap to also update the results. Consider what will happen inside RavenDB when a new order comes in. We'll run
the map
portion of the index, getting the Company
, sum(Lines[].Quantity)
and Count = 1
from the newly created document.
The easiest way to visualize that is to add another row to Figure 11.5. At this point, RavenDB can then just aggregate the new results alongside the already existing result and get to the final tally. In other words, the complexity here exists in order to allow RavenDB to efficiently update MapReduce results when documents are created or updated.
This works great when we have a small number of items to aggregate. Such is the case with companies/1-A
. But what happens when
the number of items grow? Let's increase the number of documents we're aggregating by a hundred fold and see where that
takes us. Go to Documents
and then Patch
and run the update script in Listing 11.2.
Listing 11.2 Increase the number of Orders documents by a hundred
from Orders
update {
for (var i = 0; i < 100; i++ ){
put("orders/", this);
}
}
After running this script, we should have 83,830 Orders documents in the database (830 original Orders and ten times that number that we
just entered). Where we previously had six entries for companies/1-A
, we now have 606. Let's look at how that works.
Go back to the MapReduce visualizer and select the
Auto/Orders/ByCountAndLines[].QuantityReducedByCompany
index. Then add the orders/396-A
and orders/445-A
documents.
The result is shown in Figure 11.6
This is very similar to how it looked before, and indeed, the structure of the data is exactly the same. This is because the
number of entries for companies/1-A
is still quite small. Let's select another pair of documents, this time belonging to
companies/77-A
and see what kind of structure we have there. Add orders/77-A
and orders/146-A
to the visualizer and
take a look at the results, as shown in Figure 11.7.
On the left in Figure 11.7, you can see that, unlike for companies/1-A
in Figure 11.3 where we had a flat list, Figure 11.7
shows a tree structure. Indeed, once we're past a certain size, RavenDB will start processing MapReduce entries in a treelike fashion.
Consider an update to orders/77-A
and how RavenDB will apply it. First, we'll run the map
on the updated document, giving
us the map entry to write to #1028
. Then, we'll run the reduce on that page, giving us the final tally for this page. We'll
then recurse upward, toward #1437
, where we'll also run the reduce.
The end result is that an update operation will take us a single map
invocation and two reduce
calls to update the final
result for companies/77-A
. In other words, if we have a lot of data for a particular key, RavenDB will start segmenting
this data and apply aggregation operations in such a way that reduces the number of operations required to a minimum. This ensures
that we're not only able to answer queries efficiently but are also able to update the MapReduce results with very little
cost.
Using aggregation in RavenDB
Aggregation operations in RavenDB are cheap, both to compute and to query. This is in stark contrast to the usual behavior of aggregation queries in other databases. The more data you have to go through, the more efficiently will RavenDB be able to actually process and aggregate it.
While typically you'll run aggregation as a daily or weekly report (usually during off hours) and store these results for use later, RavenDB allows you to just query the aggregated data. RavenDB will provide the answers you need, as well as keep everything up to date.
Defining your own MapReduce indexes
We've seen from Listing 11.1 how we can query using group by
and RavenDB will generate a MapReduce index for us behind the
scenes. This works quite nicely, but there's only so much that you can do with a dynamic query before the query optimizer will
give up.
The query optimizer can do quite a lot, but in order to make sure that it's predictable, it's currently limited to recognizing and being able to generate MapReduce indexes from a fairly small list of predictable patterns. For more complex things, you'll need to create your own MapReduce index. Luckily, this is easy to do.
Let's say we want to get the total number of sales and the total amount we made on each product. We can't express this as
a simple query, so we'll need to create a MapReduce index of our own for that. Go to the Indexes
page and click on
New Index
. Name the index Products/Sales
, then click the Add Reduction
button. Listing 11.3 has the contents of the Map
and Reduce
fields. After you're done filling these in, click Save
to create the new index.
Listing 11.3 Compute total number of sales and revenue per product
// map
from o in docs.Orders
from l in o.Lines
select new
{
l.Product,
l.Quantity,
Total = (l.Quantity * l.PricePerUnit) * (1 - l.Discount)
}
// reduce
from r in results
group r by r.Product into g
select new
{
Product = g.Key,
Quantity = g.Sum(x => x.Quantity),
Total = g.Sum(x => x.Total)
}
Before we get to investigating how this index works, let's talk about what it does. The map portion runs over all the orders and
all the line items for each order. Then, for each of the line items, we output an entry with the quantity sold and the amount
of money we made on this product. The reduce will group all these results based on the Product
field and then sum up all the
final numbers for the Total
and Quantity
sold.
Now that we've really seen map
and reduce
, it should be easier to grasp how they're truly two separate
operations. It's also important to understand that we aren't actually running the map
or the reduce
on the full results
all the time. Instead, we break the results apart internally and apply map
and reduce
to portions of the data each time.
This leads to several important restrictions on MapReduce indexes:
- Both the
map
andreduce
functions must be pure functions. In other words, they should have no external input, and calling them with the same input must always return the same output. In particular, usage ofRandom
orDateTime.Now
and similar calls is not allowed. - The output of the
map
is fed into thereduce
. This should be obvious, but what may not be obvious is that the output of thereduce
is also fed into thereduce
, recursively. In particular, you should make no assumptions in thereduce
about the number of calls or the amount of data that you have to process in each invocation ofreduce
. - The output of the
reduce
must match the output of themap
. (Because both are being fed back into thereduce
, they must have the same structure.) RavenDB will error if you have a different shape for each of the functions.
Because of these restrictions, RavenDB can apply the map
and reduce
functions in an incremental fashion and generate the
results that we've already seen. This is the key to RavenDB's ability to compute aggregation cheaply over time.
We can now run the query in Listing 11.4 to find the top grossing products:
Listing 11.4 Finding the top grossing products across all orders
from index 'Products/Sales'
order by Total as double desc
You'll note that the results of this query are instantaneous. Even though we have tens of thousands of records,
RavenDB only needs to look through 77 precomputed results. We can also sort by Quantity
to find the most popular products.
Go into the Map-Reduce Visualizer
and select the Products/Sales
index. Then, enter orders/6-A
and orders/10-A
to
see the internal structure of the MapReduce index, as shown in Figure 11.8.
As you can see in Figure 11.8, all of the entries are big enough to require a tree structure. Let's take a closer look
at products/39-A
,with Figure 11.9 zooming in on a single page inside that tree.
Page #578
holds over 25,000 entries for this product. How can a single page hold so many? The answer is that RavenDB applies compression to
the mapped entries. Since they're mostly similar, they have a high compression rate, allowing
us to pack a lot of entries into a small amount of space. This also adds to performance of the updating entries in the tree,
since we don't have to do as much work and the depth of the tree is much smaller.
This is about as low-level as we'll get when discussing the MapReduce implementation. You should now have a good feel for the relative costs of using aggregation in general — and MapReduce in particular — in RavenDB. With that knowledge under your belt, let's explore some of the more interesting things that we can do with MapReduce indexes in RavenDB.
Common pitfalls with MapReduce indexes
Certain operations are not easy to perform using the limits on the map
and reduce
functions that RavenDB has. Probably the
most obvious example here is the problem with calculating an average. Let's edit the Products/Sales
index to add an average.
Listing 11.5 shows the updated definition with the most obvious (and wrong) way to do so:
Listing 11.5 Computing average (wrongly) in MapReduce index
// map
from o in docs.Orders
from l in o.Lines
select new
{
l.Product,
l.Quantity,
Count = 1,
Total = (l.Quantity * l.PricePerUnit) * (1 - l.Discount),
Average = 0, // so we'll have the same shape as the reduce
Debug = new object[0] // same shape as reduce
}
// reduce
from r in results
group r by r.Product into g
select new
{
Product = g.Key,
Quantity = g.Sum(x => x.Quantity),
Count = g.Sum(x => x.Count),
Total = g.Sum(x => x.Total),
Average = g.Average(x => x.Total),
Debug = g.Select(x => x.Total).ToArray()
}
Save the updated index and then run the following query: from index 'Products/Sales' where Product = 'products/1-A'
.
The result is showed in Figure 11.10.
The results shown in Figure 11.10 are wrong. The average price for a box of chai (which is what products/1-A
is) is not
$161,449.793. So why are we getting this result?
The answer is simple. The Average
call isn't complex: it sums the input elements and then divides that sum by the number of elements. But remember that map
and reduce
are not called on each individual value. Instead, they're called in batches. This means by the time the final call to reduce
happened, it didn't get a flat list of all the results. Instead, it got an aggregated list. Figure 11.11 shows the Debug
field from
the index, which can allow us to better understand what's going on.
As you can see, the computation of the Average
is correct for the input it received. Because we executed the reduce
multiple
times, by the time the final round came, we fed it seven results, each of them aggregating about 500 documents. In other words, when we write a MapReduce index, we need to take into account the fact that we never have access to the total
result set. This make sense, when you think about it. You don't want to have a single function invocation that has to go through
many tens of thousands of documents (or many more, on realistic datasets).
Don't assume that the order is fixed, either
If your MapReduce index does work that relies on the order of operations or the order of items passed to the
map
orreduce
function, it's going to fail in a similar manner. This is common when you have calls toFirst
or use arrays in the MapReduce index. If you care about the order of operations, you'll need to ensure the order as part of the MapReduce index and remember that the calls are made in a recursive manner.
So how can we compute an average? The answer is obvious when you know it, we need to keep track of the Count
and Total
as we are reducing the data and compute the Average
using these. You can see it in Listing 11.6.
Listing 11.6 Properly computing average in a MapReduce index
// reduce
from r in results
group r by r.Product into g
let qty = g.Sum(x => x.Quantity)
let total = g.Sum(x => x.Total)
select new
{
Product = g.Key,
Quantity = qty,
Count = g.Sum(x => x.Count),
Total = total,
Average = total / qty
}
All we need to do for the proper computation is make sure we aren't assuming the total dataset is passed to the
reduce
call. In this case, we already have all the data we need, and we can just compute the final average. Note that we'll
still need to have a field called Average
in the map
function because the shape must match. But since we're never reading
this field, we don't have to give it any special value and can just initialize it to any value we want.
Similarly to Average
, we can't use Count()
in the reduce
either, for the exact same reason. That's why we use Count = 1
in the map
and sum it in the reduce
: to allow us to start the recursive computation of the total amount of items.
Complex MapReduce indexes
So far, the MapReduce indexes we've built were pretty simple, mostly doing trivial aggregation on numbers. To be fair, that's usually what you'll do with MapReduce indexes. But you can also do more with them.
A good example would be to compute the total sales per company. This sounds like just taking the Products/Sales
index and
changing what we're grouping on, so let's make this more interesting. What we want to do is to get, per company, the total
sales per product.
Listing 11.7 shows the Companies/Purchases
index, which does just that.
Listing 11.7 Computing the total products sold for each company
//map
from o in docs.Orders
from l in o.Lines
select new
{
o.Company,
Products = new [] { new { l.Product, l.Quantity } },
Total = l.Quantity
}
//reduce
from r in results
group r by r.Company into g
select new
{
Company = g.Key,
Products = g.SelectMany(x => x.Products)
.GroupBy(x => x.Product)
.Select(p => new
{
Product = p.Key,
Quantity = p.Sum(x => x.Quantity)
}),
Total = g.Sum(x => x.Total)
}
Looking at Listing 11.7, we can see that the map
function looks pretty normal, but there's a considerable amount of
work going on in the reduce
. Let's break it apart into its components and examine each independently.
First, the map
generates an entry per line item, and the entry contains a single element Products
array with the product's
ID and the quantity sold. The reason we create the array in the map is that we need to match the shape of the reduce
. In the
reduce
function, we group the results by the Company
and then generate the results for that particular company. The most
interesting tidbit happens when building the Products
field.
There, we apply another GroupBy
call to aggregate the data inside the Company
once again. We use this to get the total
numbers of items purchased for each product, and this mechanism allows us to aggregate the data across multiple invocations to
reduce
safely and in a natural manner.
Now, let's see what this index actually generates. Execute the following query:
from index 'Companies/Purchases' where Company = 'companies/13-A'
and observe the results. You can also see this in
Figure 11.12.
This kind of index can be used to prepare complex computations ahead of time, leaning on RavenDB's ability to compute this beforehand. RavenDB will then be in charge of keeping such details up to date, and your client code can simply query the output of the index.
Reduce should...reduce the size of the data
A somewhat overlooked requirement for the
reduce
function is that the amount of data going out of thereduce
function should be smaller than the amount of data going in. All the examples we've seen so far have done this properly, mostly because they aggregated information.Let's look at an example where the amount of information that passed through
reduce
doesn't go down and discuss its implications. In theCompanies/Purchases
index, if theProducts
field was defined just asProducts = g.SelectMany(x => x.Products)
, we would have a problem. Callingreduce
won't actually reduce the amount of data we're working with and, as time goes by, thereduce
function will have to operate on larger and larger values.This isn't an efficient way to do things, and since we need to keep track of intermediate values, it will lead to a marked increased in memory utilization and disk space usage, and it will overall be quite expensive.
Querying MapReduce indexes
When talking about MapReduce indexes, we typically focus on the Reduce portion of them — the aggregation and computation being done. This, however, is only part of the story. While a lot of stuff is going on in the Reduce portion of the index, in the end, the data is still being written to the index in exactly the same way as we've seen in the previous chapter.
This means that you can execute any of the usual operations you would normally run. As a good example, let's look at
Figure 11.12. The output of the query is pretty bare bones: what is products/21-A
, for example. Listing 11.8 shows a nice
way to get more information at very little cost.
Listing 11.8 Using functions to enrich the results from a MapReduce index
declare function addProductName(result) {
for (var i = 0; i < result.Products.length; i++) {
var p = load(result.Products[i].Product);
result.Products[i].ProductName = p.Name;
}
return result;
}
from index 'Companies/Purchases' as result
where result.Company = 'companies/13-A'
select addProductName(result)
The result of the query in Listing 11.8 is shown in Listing 11.9.
Listing 11.9 Output of the query in Listing 11.8, showing enriched result set of MapReduce index
{
"Company": "companies/13-A",
"Products": [
{
"Product": "products/21-A",
"Quantity": 1010,
"ProductName": "Sir Rodney's Scones"
},
{
"Product": "products/37-A",
"Quantity": 101,
"ProductName": "Gravad lax"
}
],
"Total": 1111
}
The query in Listing 11.8 should serve as a nice example of the kind of things we can do, but it's just the first taste. In addition to using projections in this manner, you can also define MapReduce fields to support full text search, apply suggestions and "more like this."
You can even query using facets on MapReduce indexes, an example of which you can see in Listing 11.10.
Listing 11.10 Facets also apply to MapReduce indexes
from index 'Companies/Purchases'
select facet(
Total < 5000,
Total between 5_000 and 9_999,
Total between 10_000 and 24_999,
Total between 25_000 and 49_999,
Total between 50_000 and 74_999,
Total > 75_000,
avg(Total)
)
Applying facets, which can do their own aggregation, to MapReduce is an interesting experience. It allows us to do things like build a MapReduce operation to aggregate some of the data in a static fashion, then allow
faceted queries to slice and dice it further. If you look at the results of the query in Listing 11.10, you'll note that
the faceted results also include an average value for each facet, and we have support for the usual (min
,
max
, sum
, avg
) aggregation methods.
In fact, that's not the only way to aggregate the data in multiple ways. There's also recursive MapReduce operations.
Recursive MapReduce
The sample dataset that we used consists of orders representing sales to companies. This is the typical e-commerce model and should be pretty familiar to you. A common requirement in such a system is reporting — in particular, pulling sales data on a daily, monthly and yearly basis.
Listing 11.11 shows the Products/DailySales
index, which gives us the details of sales by product by day.
Listing 11.11 Computing total daily sales by product for all orders
//map
from o in docs.Orders
from l in o.Lines
select new
{
o.OrderedAt.Date,
l.Product,
Count = l.Quantity
}
//reduce
from r in results
group r by new { r.Date, r.Product } into g
select new
{
g.Key.Date,
g.Key.Product,
Count = g.Sum(x => x.Count)
}
The output of the Products/DailySales
index can be seen in Figure 11.13. The only new thing we have in Listing 11.11 is the
group by
on multiple fields, the Date
and the Product
.
Next, we need to compute the same values per month and then per year. We can define the same index again and run the exact same computation but this time group by the product, year and month and then just by the product and year. But we currently have 83,830 orders in the database, and we'll likely have more. Doing the same operation on all these values again and again seems...inefficient.
RavenDB supports the notion of outputting a MapReduce output from an index to a dedicated collection. Set the
Output reduce to collection
to DailyProductSales
. Figure 11.14 shows how this can be done.
Make the modification to the Products/DailySales
index and save it. Then head to the Documents
tab and look at the
collections in the database. You can see the results in Figure 11.15.
In many respects, these artificial documents will behave just like standard documents. You can load them, query them and even save modifications to them. Note that modifying an artificial document by hand is not recommended, as the next update to the index will overwrite any changes you have made to the document, after all. Artificial documents are not replicated. Since they're being created by the index directly, they'll also be created in the remote node, so there's no point in sending them over the wire.
Artificial documents are updated whenever the index completes a batch of documents, so there's very little lag time between the index picking up changes and the artificial documents' update. If this were the only thing artificial documents were good for, that wouldn't be of much use. After all, we already have the results as the output of the MapReduce index. So why do we need artificial documents?
The primary reason that artificial documents exist is so you can set up indexes on top of them. And that
includes additional MapReduce indexes. Take a look at Listing 11.12, showing just such an example of
the Products/MonthlySales
index, computing the monthly totals from the daily totals.
Listing 11.12 Recursive MapReduce index using artificial documents
//map
from ds in docs.DailyProductSales
select new
{
ds.Product,
Year = ds.Date.Year,
Month = ds.Date.Month,
ds.Count
}
//reduce
from result in results
group result by new
{
result.Product,
result.Year,
result.Month
}
into g
select new
{
g.Key.Year,
g.Key.Month,
g.Key.Product,
Count = g.Sum(x => x.Count)
}
You can see that the map
of the Products/MonthlySales
index is using the DailyProductSales
collection as it source, and the reduce
aggregates the data by product on a monthly basis. Let's examine
what's actually going on here in a little more depth.
Whenever an order is created or modified, the Products/DailySales
index will run, computing the updated
daily totals for the products in the new order. As a result of this index running, artificial documents will
be created (with IDs such as DailyProductSales/3183876884104958208
).
Because a DailyProductSales
document was created, the Products/MonthlySales
index will run on the changed daily
tallies to update its own numbers. We can even set things up so we'll have an output collection for the
MonthlyProductsSales
as well, and then define a Products/YearlySales
. The recursive nature of the indexing naturally
extends in this manner.
Artificial documents do have a few limitations that you should be aware of:
- RavenDB will detect and generate an error if you have a cycle of artificial documents. In other words, you can't define another index that will output artificial documents if that will trigger (directly or indirectly) the same index. Otherwise, you might set up a situation where the indexes run in an infinite loop.
- You must choose an empty collection. RavenDB will not allow you to output artificial documents into a preexisting collection. This is done because RavenDB will overwrite any document in the collection, so it prevents the option of overwriting existing documents.
- The document identifiers for the artificial documents are generated by RavenDB (based on the hash of the reduce key), and you don't have any control over them.
- Artificial documents are not sent over replication and cannot use revisions or attachments.
Artificial documents and subscriptions
You can use subscriptions and artificial documents together. (In fact, along with recursive MapReduce, that's one of the primary reasons they exist.) But you need to be aware of a small wrinkle in this setup. Because artificial documents aren't sent via replication, each node in the database group is going to have its own (independent) copy of the results. The contents are the same, but the subscription has no way of knowing this.
Because of this issue, it's recommended to use artificial documents with subscriptions only on a single node. Failover of the subscription to another node may cause the subscription to send artificial documents that the subscription has already acknowledged. You can configure this by disabling the dynamic task distribution as part of the subscription configuration.
Artificial documents and recursive MapReduce are a good match, but before you turn your RavenDB instances into a reporting powerhouse, there's another possible topology to consider. Instead of defining these indexes and processes on the main database, you can set up external replication to a dedicated database (on the same cluster or on a separate one) and run all that work there.
This can simplify distribution of work as the application grows. If you have many such indexes and a high rate of changes, being able to isolate the work to a particular database group (and thus specific nodes) can be very helpful.
MultimapReduce indexes
In the previous chapter, Listing 10.12 demonstrated the usage of multimap indexes to index several collections and
unite the results into a single index. This can be great for when you want to search over several things at the same
time. The example we looked at in Chapter 10 searched for a person by name, where the person can be an Employee
, a
Contact
on a Company
or a Contact
for a Supplier
. Regardless, we were able to search for that person easily
and in a convenient manner.
MultimapReduce indexes allow us to extend that behavior to also include aggregation across multiple sources in a natural way. Let's first look at Listing 11.13, showing off this feature, and then explore what this means.
Go to the Indexes
page and create a new index. Click on Add map
twice and create the three maps. Then click Add Reduction
and add the reduce. Name the new index Cities/Details
and click Save
.
Listing 11.13 MultimapReduce index that sums the points of interest in each city
// map #1
from c in docs.Companies
select new
{
c.Address.City,
Companies = 1,
Suppliers = 0,
Employees = 0
}
// map #2
from s in docs.Suppliers
select new
{
s.Address.City,
Companies = 0,
Suppliers = 1,
Employees = 0
}
// map 3
from e in docs.Employees
select new{
e.Address.City,
Companies = 0,
Suppliers = 0,
Employees = 1
}
//reduce
from result in results
group result by result.City
into g
select new
{
City = g.Key,
Companies = g.Sum(x => x.Companies),
Suppliers = g.Sum(x => x.Suppliers),
Employees = g.Sum(x => x.Employees),
}
Take a few minutes to look at Listing 11.13. There's a lot of stuff going on there. We define three maps, on the
Companies
, Suppliers
and Employees
. And for each, we output a count for the type of the document we're mapping, as well as the relevant City
. Finally, on the reduce, we simply group by City
and then sum up all the results
from all the intermediate steps to get the final tally. Listing 11.14 shows the output from this index for London.
Listing 11.14 Output of the 'Cities/Details' index for London
{
"City": "London",
"Companies": 6,
"Suppliers": 1,
"Employees": 4
}
The Cities/Details
index is interesting, in the sense that it shows off capabilities, but it isn't really that
exciting. It also operates on a small dataset, even if it touches multiple collections.
Let's modify the index by having it operate also on orders.
We'll first add another entry for each of the existing maps: OrderTotal = 0
. And we'll add the same logic for the reduce
function: OrderTotal = g.Sum(x => x.OrderTotal)
. The first step is required because all the maps and reduces in an index
must have the same output. The second is required to actually sum up the information we'll shortly add. Now click on the
Add map
button on the index edit page and add the map shown in Listing 11.15.
Listing 11.15 Adding total order per city to the 'Cities/Details' index
from o in docs.Orders
select new
{
o.ShipTo.City,
Companies = 0,
Suppliers = 0,
Employees = 0,
OrderTotal = o.Lines.Sum(
x => x.Quantity * x.PricePerUnit
)
}
We now have an index that can give us a lot of interesting details about each city. Listing 11.16 shows the updated output for London in the database.
Listing 11.16 London's information now includes the total revenue for orders shipped to it
{
"City": "London",
"Companies": 6,
"Suppliers": 1,
"Employees": 4,
"OrderTotal": 4107034.71
}
As you can imagine, this kind of behavior is powerful because it allows you to pull data from disparate parts of your system and aggregate it with very little work. Querying this information is effectively free, so that also makes it much easier to consume and work with.
MapReduce indexes are also just indexes
It bears repeating that a MapReduce index is also just an index. We focused heavily in this chapter on the Reduce portion of such indexes, but it's important to remember that they're also capable of doing everything that a map-only index can do.
In other words, if we had a
Cities
collection that also had the coordinates of each city, we would be able to modify theCities/Details
index to be a MultimapReduce index that also provides spatial queries. You can use facets on a MapReduce index, apply suggestions, run full text queries, etc. All too often, I see users assume the Reduce part of the index is where the functionality stops. In fact, this is where it begins.
Dynamic aggregation with MapReduce indexes
MapReduce indexes are a wonderful way to handle aggregation. RavenDB's ability to precompute and answer complex aggregation cheaply can make a world of difference in your ability to deliver features in your applications. The fact that showing an aggregated value doesn't require you to set up an off-hours job, monitor it, clear caches, etc. is a force multiplier for your application's capabilities.
However, while the MapReduce operation can seem magical at times, it's a tool fit for a purpose. Trying to use it for a different purpose than it's intended for will produce suboptimal results. MapReduce indexes are great when
- The fields you're aggregating by are known in advance.
- The source data you're aggregating is known in advance.
In other words, MapReduce is great for static aggregation. If you want to get the daily totals by product, MapReduce is the perfect solution. The data source (all orders) is known. What you're aggregating by (the date and the product) is known in advance, too. RavenDB is able to generate the appropriate MapReduce operation, compute the result and allow you to query it easily and cheaply.
But what happens if we want to aggregate daily sales by product only for London? Well, that's easy. We can define another MapReduce index that aggregates the results by date, product and city. We get the same benefits, and everyone is happy. But the next request is to get the daily sales by city based on the supplier, not the product. And the next request after that is to aggregate the sales by the employee on a monthly basis, and the one after that is to see the yearly sales by product only for specific customers, and then...
I think you get the picture. MapReduce indexes are great when the type of aggregation is known in advance. In this case, RavenDB is able to prepare everything and have an answer ready for you by the time you query. But if your queries are dynamic and fluid, and if you're changing what you're querying on and how you're doing the aggregation, this is much more complex.
One option would be to define a MapReduce index for each type of aggregation you need. This works, but you might end up with a lot of MapReduce indexes. That can be fine. RavenDB is very efficient in the handling of MapReduce indexes (they're typically cheaper to run than a map-only index, actually), but a large number of indexes still means that we need to execute some amount of work times the number of indexes for each change.
A better approach for dynamic aggregation is to use facets. To see the total sales for egg noodles and cheese in the first week of July 1996, we can run the query in Listing 11.17.
Listing 11.17 Using facets on top of a MapReduce index to achieve dynamic aggregation
from index 'Products/DailySales'
where Date between '1996-07-01' and '1996-07-07'
and Product in ('products/11-A', 'products/42-A')
select facet(Product, sum(Count))
Let's break the query in Listing 11.17 apart, one clause at a time. First, we select the MapReduce index Products/DailySales
,
which we've already seen in Listing 11.11. The where
clause specifies the date range we want to query and the particular products
we're interested in. Note that the Date
query is using between
and relies on the fact that we do lexical comparisons to
get a clear syntax for the date range.
The select
clause is using facets, but unlike the facets queries we looked at in the previous chapter, we're now adding
a new wrinkle in the form of sum(Count)
. This allows us to do an aggregation operation over the query results. In this way, we can dynamically control
what will be aggregated.
Cost analysis for dynamic aggregation (facets) vs. static aggregation (MapReduce)
An important distinction needs to be made about the cost structures of using aggregation in MapReduce vs. facets. For MapReduce, by the time you're querying the information, it's already been aggregated. You're actually doing a search on top of the already precomputed results.
Aggregation with facets (which is sometimes also called dynamic aggregation) requires us to run over the results of the query and compute the final aggregated values. Note that the amount of results the faceted query needs to aggregate is just the query results, not the total size of the source data.
This can be confusing, so we'll use the example of the
Products/DailySales
index and the faceted query in Listing 11.17 to clear things up. We have the following values:
D
— the total number of orders. In this database, after running the patch operation in Listing 11.2, the number is 83,830 documents.R
— the unique reduce keys after the MapReduce aggregation. In the case of theProducts/DailySales
index, that's the unique (date, product) pairs that we're grouping by. This value is 2,106 unique reduce keys for the dataset we have.Q
— the number of results matched in the query. For the query in Listing 11.7 (sans facets), that number is two.With these values, we can now give proper estimates of the costs of making various queries. A typical cost of a MapReduce query is
O(log R)
. So the cost for a query onProducts/DailySales
would be about 11 operations.A faceted query on all orders will have a cost of
O(D)
, withD
equal to 83,830. This computation is done each time the query is run. However, the only reason the cost isO(D)
is that we queried over all orders. Because the number of results for the query in Listing 11.7 is two, the cost of actually aggregating them is effectively nil.It's a great way to handle such scenarios: use MapReduce to do the first level of the aggregation, and then use dynamic aggregation using facets to further slice and dice the data as you need.
The result of the faceted query is affected by the matches for the query, and that can change dynamically with very little cost, opening the way to more dynamic aggregation queries. Dynamic aggregation also has a pretty straightforward cost, since it varies linearly with the amount of matches for the query.
That's a great thing since it simplifies dynamic queries. But it also means that if you want to run a faceted query with aggregation on a very large result set, it's going to take time to process. A MapReduce on the same (or much larger) amount of data will be much faster at querying time, but it's limited in the amount of flexibility it allows for each query.
Combining MapReduce and facets to handle this is a great way to reduce1 the amount of data the facets need to go through. It's easiest to consider this kind of approach as feeding the facets baby food, already pre-chewed. That dramatically cuts down the amount of effort required to get the final result.
Multi-step aggregation processes
We've looked at all sorts of MapReduce indexes in this chapter, and we also looked at how we can use dynamic aggregation to build dynamic aggregated queries. Now I want to apply all that knowledge to a more complex example, utilizing many of the features we explored in this chapter.
Here's the requirement: we want to be able to get a monthly report of sales by supplier per city. It might be easier to understand if we start from the point of view of the user interface. Take a look at Figure 11.16, which shows a mockup of the search interface and the results we want to see.
The simplest way to be able to answer this query is to use LoadDocument
in the map phase of the index. You can see how
this is done in Listing 11.18.
Listing 11.18 MapReduce index using LoadDocument
can pull data from related documents
//map
from o in docs.Orders
from l in o.Lines
let product = LoadDocument(l.Product, "Products")
select new
{
o.ShipTo.City,
o.ShippedAt.Month,
o.ShippedAt.Year,
Supplier = product.Supplier,
Total = l.PricePerUnit * l.Quantity
}
// reduce
from result in results
group result by new
{
result.City,
result.Supplier,
result.Month,
result.Year
}
into g
select new
{
g.Key.City,
g.Key.Supplier,
g.Key.Month,
g.Key.Year,
Total = g.Sum(x => x.Total)
}
The interesting bit in Listing 11.18 is the LoadDocument
call in the map. Everything else is pretty much the same as we've
done throughout this chapter. We looked at LoadDocument
in the previous chapter, and it serves mostly the same role in
MapReduce indexes as well. If the product document has changed, we'll have to re-index all the documents that referenced it.
Because of this, the LoadDocument
option is only available during the map phase of a MapReduce index. You can't call
LoadDocument
from the reduce.
A sample query for this index can be seen in Listing 11.19.
Listing 11.19 This query can be used to render the results in Figure 11.16.
from index 'Sales/ByCityAndSupplier' as t
where t.City = 'London' and t.Month = 2 and t.Year = 1997
order by t.Total as double desc
load t.Supplier as s
select s.Name, t.City, t.Supplier , t.Month, t.Year, t.Total
This works. It generates the results you see in Figure 11.16. However, there's a problem here. LoadDocument
requires
that RavenDB updates the index if the referenced document (the product's document, in this case) is updated. This means that
an update to a product can force RavenDB to re-index all the orders that have this product. If it's a popular product, this
may require RavenDB to re-index a large number of orders.
Ideally, we want to have LoadDocument
where the number of referencing documents is bounded and small. Is there a better way
to handle this? We can change the index in Listing 11.18 so it doesn't include the Supplier
, and instead groups things
only by (product, city, month, year). We'll also define an output collection for the results.
The output collection is then used in another index where we'll use a LoadDocument
to achieve the same output.
Why all of this complexity? Aren't we in exactly the same position as we were before? An update to a product document will
force us to re-index, after all. And the answer is yes, it will force us to re-index, but the question is what. In the case
of Listing 11.18, any change to a product will force re-indexing of all the Orders
documents that referenced it. But in
the case where we had an intermediate artificial documents collection, we'll only need to re-index those that referenced the
modified product.
Those documents have already gone through a MapReduce process, and there are likely to be far fewer of them than there are orders, so this is a net win in terms of the total amount of work that has to be done.
Summary
This chapter covered a lot of ground. I tried to strike the right balance between giving you enough information about what's actually going on behind the scenes and drowning everything in implementation details.
The key takeaway I hope you'll get from this chapter is that aggregation in RavenDB is cheap and plentiful, so have at it. This is in direct contrast to the way things usually work with other databases. Aggregation is often expensive and hard, so it gets pushed to dedicated solutions (nightly runs, reporting databases, cached queries, etc.). None of this is needed with RavenDB.
We started this chapter by writing some simple RQL queries using group by
, and it just worked. The query optimizer recognized
the query and generated the appropriate MapReduce index, and we were off to the races. We then took a deep, hard look at what was
actually going on there, analyzing what the index was doing and how MapReduce works inside RavenDB.
In order to better understand that, RavenDB has a few tools built in to help you. Chief among them is the MapReduce visualizer, which lets you peek deep into the heart of how RavenDB executes MapReduce operations. We looked at the different behaviors that happen when we have a small amount of items to aggregate for a particular reduce key (all items are reduced in a single group) and when we have a very large amount of items (reduce is handled in a recursive tree fashion).
This means that updates to MapReduce indexes are very fast because we usually need to do a minimal amount of work to get the updated results. After learning how RavenDB processes MapReduce indexes, we learned how we can define our own, with complete freedom of how we want to structure the data and aggregate it.
We also learned there are some things to remember when you build your indexes, such as making sure that the map
and reduce
functions are
pure, that your reduce
function can handle being called recursively and that the reduce
outputs less data than we put in.
We then upped the ante and looked at more complex MapReduce indexes, grouping all the results by company. Inside each company, we grouped things again by product to get a detailed summary of the results. These kinds of indexes can allow you to do sophisticated aggregation, rollups and computation during the indexing. Then they expose the whole thing, readily consumable, to your application.
MapReduce indexes can do amazing things, but they're also indexes in RavenDB. That means that, in addition to whatever you're doing in the MapReduce operation, all the usual stuff you can do with indexes is also there. We looked at how we can use JavaScript projections to enrich the MapReduce output during query, use facets on the data (including additional aggregation) and the many more options (full text search, spatial, etc.).
If a single MapReduce index isn't enough, you can always try things recursively. Indeed, RavenDB allows MapReduce indexes to output artificial documents back to the database, and these documents are a key part of allowing RavenDB to handle recursive MapReduce indexes. We looked at how we can do a daily and monthly rollup of sales per product by creating a MapReduce index that's fed off the artificial documents collection that's created by another MapReduce index. This opens up the option to do some really cool things because you can pipeline the work and take advantage of what's already been done.
Another cool option we looked at was the ability to use MultimapReduce indexes. Instead of just aggregating data from a single collection, we can aggregate the data from several of them at once. That allows you to, with very little effort, paint a picture of what's actually going on in your database and gain insight into what you're doing. We saw a good example of that when we looked at what was going on in each city and were able to tell how many companies, employees, suppliers and sales we had in each location.
MapReduce is fantastic when you have a static aggregation scenario — when you know what you're aggregating ahead of time. Otherwise, you'll need to (either directly or via the query optimizer) generate MapReduce indexes for each permutation that you want to query. Another alternative to that is the notion of dynamic aggregation, using facets to slice and dice the information dynamically. This tends to be less efficient than a MapReduce index, but it has less upfront costs.
It's common to do that in two stages: first defining the rough shape of the aggregation in a MapReduce index, efficiently doing the aggregation, and then using facets and dynamic aggregation on that much smaller result set to narrow down things more accurately. This gives you both fast aggregations and more flexibility in the queries.
We finished the chapter by looking at aggregating documents by a related document, using LoadDocument
in a MapReduce
index. This works, and it can be quite an elegant solution for some scenarios. But it's also possible to get into trouble with
this approach because an update to a referenced document requires re-indexing of all the referencing documents. Instead, we
can utilize artificial documents and two MapReduce indexes to reduce the amount of work required when we need to
re-index.
The MapReduce engine inside RavenDB is very flexible, and it's been used to great effect over the years — including in many scenarios that surprised RavenDB's own development team. Aggregation of results is only the most obvious of the options that are available to you.
In the next chapter, we'll switch gears a bit and move to a more practical mindset, talking about how to utilize and work with indexes in your applications.
-
Pun very much intended here.↩