I've also decided to introduce Kotlin 1.1RC in the project for starting learning what is new in this version. As always, you can find the code of this post in its repository.
In this post, I'm going to introduce the Hadoop concepts of a Combiner but also how to define custom Counters and Properties in a MapReduce program. The example this time is the n-gram counter problem with a keyword for filtering the results. Given as input one or more text files, for each line we create multiple sequences of n or less words starting from the first one. We then filter out all the n-grams that start with the input keyword and we count how many times each sequence appears in the inputs.
Let's start with the easy one, Properties and Counters. A property is an attribute of the program that we want to share in our working context, and is usually a read-only property that we don't have interest in changing during the execution. In our example, we will use them to save the value n of our n-grams and the keyword to use in the filtering.
Instead, a counter is some sort of statistics we want to keep track of during the running of our program. In this case, we want to keep track of how many n-grams we filter out because they start with our keyword and we will increment the counter by 1 every time we discard one of them.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import common.hadoop.extensions.* | |
import org.apache.hadoop.conf.Configured | |
import org.apache.hadoop.fs.Path | |
import org.apache.hadoop.io.IntWritable | |
import org.apache.hadoop.io.Text | |
import org.apache.hadoop.mapreduce.Job | |
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat | |
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat | |
import org.apache.hadoop.util.Tool | |
class NGramDriver : Configured(), Tool { | |
enum class NGramCounters { | |
REMOVED_BY_KEYWORD | |
} | |
override fun run(args: Array<out String>): Int { | |
if (args.size != 5) { | |
error("Expected <#reducers> <input_path> <output_path> <n> <keyword>, but received" + | |
"${args.size} elements.") | |
} | |
val numReducers = args[0].toInt() | |
val inputPath = Path(args[1]) | |
val outputPath = Path(args[2]) | |
val n = args[3].toInt() | |
val keyword = args[4] | |
val config = this.conf | |
// set properties for the configuration | |
config.set("numWords", n.toString()) | |
config.set("keyword", keyword) | |
with(Job.getInstance(config)) { | |
jobName = "Big Data - N-Gram Count" | |
addPaths(inputPath, outputPath) | |
setJarByClass<NGramDriver>() | |
setInputFormatClass<TextInputFormat>() | |
setOutputFormatClass<TextOutputFormat<Text, IntWritable>>() | |
setMapper<NGramMapper, Text, IntWritable>() | |
// Combiner input is the same of the Reducer while the output is the same | |
// of the Mapper. | |
setCombinerClass<NGramCombiner>() | |
setReducer<NGramReducer, Text, IntWritable>(numReducers) | |
return if (waitForCompletion(true)) 0 else 1 | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import common.hadoop.extensions.split | |
import common.hadoop.extensions.toIntWritable | |
import common.hadoop.extensions.toText | |
import org.apache.hadoop.io.IntWritable | |
import org.apache.hadoop.io.LongWritable | |
import org.apache.hadoop.io.Text | |
import org.apache.hadoop.mapreduce.Mapper | |
import java.util.regex.Pattern | |
class NGramMapper : Mapper<LongWritable, Text, Text, IntWritable>() { | |
override fun map(key: LongWritable, value: Text, context: Context) { | |
val words = value.split(Pattern.compile("\\W+")) | |
val numWords = context.configuration | |
.get("numWords") | |
.toInt() | |
(0 until words.size).map { Pair(it, StringBuilder().append(words[it])) } | |
.onEach { | |
val (index, builder) = it | |
((index + 1) until words.size).take(numWords - 1) | |
.forEach { builder.append(" ").append(words[it]) } | |
Pair(index, builder) | |
}.map { it.second.toString() } | |
.forEach { context.write(it.toText(), 1.toIntWritable()) } | |
} | |
} |
A Combiner places itself between the Mapper and the Reducer execution. The idea is that, in order to reduce the amount of data that the Mapper sends in the network, it is possible to combine these data in some way while they are still in memory, making possible to reduce the amount of them sent to the Reducer. A Combiner, like the Reducer, extends the org.hadoop.mapreduce.Reducer class and its input key:value and output key:value are the same of the reducer. Considering our example, instead of sending multiple times the same n-gram, or sending a n-gram that is going to be filtered out, we can first combine those results in the Combiner phase, reducing the amount of data in the network by counting a priori the number of times a n-gram appears, or immediately filter out those n-grams that match our keyword.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package common.hadoop.extensions | |
import org.apache.hadoop.mapreduce.Counter | |
operator fun Counter.inc(): Counter { | |
this.increment(1) | |
return this | |
} | |
operator fun Counter.plusAssign(value: Long) { | |
this.increment(value) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import NGramDriver.NGramCounters.REMOVED_BY_KEYWORD | |
import common.hadoop.extensions.plusAssign | |
import common.hadoop.extensions.toIntWritable | |
import org.apache.hadoop.io.IntWritable | |
import org.apache.hadoop.io.Text | |
import org.apache.hadoop.mapreduce.Reducer | |
/** | |
* Combiner for the problem. It should run at least once. | |
* Input and Output are the same of the Reducer for the previous reason. | |
*/ | |
class NGramCombiner : Reducer<Text, IntWritable, Text, IntWritable>() { | |
override fun reduce(key: Text, values: Iterable<IntWritable>, context: Context) { | |
val keyword = context.configuration | |
.get("keyword") | |
// if key starts with the keyword, than the counter is increase | |
// b1ased on the number of times it appears in the text, otherwise | |
// writes the key and the sum of occurrences | |
if (key.toString().startsWith(keyword)) { | |
context.getCounter(REMOVED_BY_KEYWORD) += values.sumBy(IntWritable::get).toLong() | |
} else { | |
context.write(key, values.sumBy(IntWritable::get).toIntWritable()) | |
} | |
} | |
} |
[NB] I'm not really sure about this second thing, so if somebody can give me more info about this, it would be incredibly appreciated.
[NB.2] The Combiner and the Reducer code are exactly the same, but for the sake of making everything more readable I decided to write two separate classes.
At the end of the execution, the value of the counter can be printed on the screen in the Driver class, but it is easier to simply look at the output of our Hadoop execution, which will print the counter result as shown in the image below.
With this ends my second post about Hadoop using Kotlin. If you have any question, or if you see that something is wrong in my explanation, don't hesitate to write me.