By DB Tsai and Jenny Thompson,
In the Alpine development team, we are always looking for ways to improve the efficiency of our algorithms. One of the most widely applicable and effective fixes we found was to implement the in-mapper combiner design pattern in our hadoop based algorithms. This can dramatically cut the amount of data transmitted across the network, and speed up the algorithm by 20%-50%.
For example, to implement the C4.5 decision tree algorithm we need to compute the information gain, which is essentially just counting all different combinations of independent and dependent variables; as a result, aggregating the result in mapper instead of emitting all of them for each row greatly increases performance. We also use this technique for our naive Bayes classifier, linear regression, and correlation operators.
A high level description and pseudo-code example can be found in Data-Intensive Text Processing with MapReduce by Jimmy Lin and Chris Dyer, 2010. It has been used in several MapReduce apps; for example, pig 0.10+ does this automatically. However, there are currently no ready-to-use concrete examples to be found online. Here we will present a step-by-step example with code, a word-count use-case and benchmarking.
An in-mapper combiner is much more efficient than a traditional combiner because it continually aggregates the data. As soon as it receives two values with the same key it combines them and stores the resulting key-value pair in a HashMap. However, if there are too many distinct keys, it may run out of memory. To avoid this problem, we use Least Recently Used (LRU) caching.
For each incoming key-value pair, we either add it as a new entry in the HashMap, or combine it with the existing entry for that key. If the HashMap grows bigger than the cache capacity, then the least recently used key-value pair will write to the context, and then there will be space in HashMap to store the new incoming key. Finally, in the cleanup method, any remaining entries are written to the context.
In contrast, when a mapper with a traditional combiner (the mini-reducer) emits the key-value pair, they are collected in the memory buffer and then the combiner aggregates a batch of these key-value pairs before sending them to the reducer. The drawbacks of this approach are
- The execution of combiner is not guaranteed; so MapReduce jobs cannot depend on the combiner execution.
- Hadoop may store the key-value pairs in local filesystem, and run the combiner later which will cause expensive disk IO.
- A combiner only combines data in the same buffer. Thus, we may still generate a lot of network traffic during the shuffle phase even if most of the keys from a single mapper are the same. To see this, consider the word count example, assuming that buffer size is 3, and we have <key, value> = <Stanford, 3>, <Berkeley, 1>, <Stanford, 7>, <Berkeley, 7>, and <Stanford, 2> emitted from one mapper. The first three items will be in one buffer, and last two will be in the the other buffer; as a result, the combiner will emit <Stanford, 10>, <Berkeley, 1>, <Berkeley, 7>, <Stanford, 2>. If we use in-mapper combiner, we will get <Stanford, 12>, <Berkeley, 8>.
Here is the implementation of in-mapper combiner using LRU cache that we use:
The api is pretty simple, just change context.write to combiner.write, and remember to flush the cache in the clean up method.
Here is the example of implementing hadoop classical word count mapper with in-mapper combiner:
The source code of this example project can be downloaded at hadoop-word-count.
After you clone the project, you can build the jar file by
you will get
mapreduce/target/mapreduce-0.1.jar. Now, let’s download English wikipedia dataset (the size of the 2 October 2013 dump is approximately 9.5 GB compressed, 44 GB uncompressed). Since the file is in XML format, we need to convert it to plain text in order to run MapReduce word count. We’ll use WikiExtractor.py to perform the conversion. The following command will download WikiExtractor.py, wikipedia dataset, and then convert it to plain text. It may take couple hours depending on your network and machine.
Now, let’s upload the data to HDFS and run them:
berkeley has higher counts by:
Which one will win???
Finally, let’s compare the performance. We tested it on a three nodes CDH4 clusters with Gigabit Network.
- Word Count: 9mins, 9sec
- Word Count with Combiner: 5mins, 21sec
- Word Count with in-Mapper Combiner: 4mins, 17 sec
You can see that the typical combiner is 1.71 times faster than the word count without any optimization. The in-mapper combiner is 1.25 times faster than typical combiner. Depending on different application and input dataset distribution, we see the performance gain varying from 20% to 50% between typical combiner and in-mapper combiner.
If you study the job detail, you will find that with in-mapper combiner, the Map output records will be significantly decreased compared with typical combiner; as a result, the Reduce shuffle bytes will become smaller as well. Those improve the performance.