In my Big Data learning schedule, the first technology is Hadoop, so I would start with a classical Hello World exercise for it, but written in Kotlin. By the way, all the code from this post can be found in its GitHub repository.
The purpose of this first exercise is pretty simple, developing a MapReduce algorithm that, given as input a text file, generates, as a result, a file with, for each word, the number of times is present in the file. First of all, what is MapReduce? Wikipedia says that "MapReduce is a programming model and an associated implementation for processing and generating large data sets with a parallel, distributed algorithm on a cluster". Made it simple from a programming point of view, it is just calling two functions, Map() for translating the input into something else and then sorting that, and Reduce() for doing a summary of what happened in the previous step.
For our exercise, this MapReduce context is pretty easy: we map each word into a tuple <word, 1>, because each word counts as 1, we sort them in groups that have the same word as key, and then we reduce each group by counting the number of 1 in them.
First the project's settings. I'm using a build.gradle to configure the entire project...
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 | version '1.0' buildscript { ext.kotlin_version = '1.0.6' ext.hadoop_version = '2.7.3' repositories { mavenCentral() } dependencies { classpath "org.jetbrains.kotlin:kotlin-gradle-plugin:$kotlin_version" } } allprojects { apply plugin: 'kotlin' apply plugin: 'java' sourceCompatibility = 1.7 repositories { mavenCentral() } configurations { provide compile.extendsFrom provide } dependencies { provide "org.jetbrains.kotlin:kotlin-stdlib:$kotlin_version" testCompile group: 'junit', name: 'junit', version: '4.12' } jar { from { // in this way the kotlin.stdlib is added to the jar file configurations.provide.collect { it.isDirectory() ? it : zipTree(it) } } } } |
... and another build.gradle for the specific exercise.
1 2 3 | dependencies { compile "org.apache.hadoop:hadoop-client:$hadoop_version" } |
They are quite standard gradle configuration files, the only important thing to notice is the jar task at line 35 in the first one. This is necessary for including the kotlin-stdlib inside the jar generated, in order to make possible to use Hadoop for running the example. I haven't found any better way to do it, if anybody has any advice about this, it would be really helpful.
Now, the mapper code. It must be a class that extends org.apache.hadoop.mapreduce.Mapper and override the map(..) method.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 | import extensions.split import extensions.toIntWritable import extensions.toText import org.apache.hadoop.io.IntWritable import org.apache.hadoop.io.LongWritable import org.apache.hadoop.io.Text import org.apache.hadoop.mapreduce.Mapper import java.util.regex.Pattern /** * Represents the Mapper class used for this example. * The generic types correspond to <InputKey, InputValue, OutputKey, OutputValue> */ class MapperBigData : Mapper<LongWritable, Text, Text, IntWritable>() { override fun map(key: LongWritable, value: Text, context: Context) { // Splits each sentence in a list of words. val words = value.split(Pattern.compile("\\W+")) words.map(String::toLowerCase) .forEach { context.write(it.toText(), 1.toIntWritable()) } } } |
The input text is inside value, so we divided it into words, and for each word, we map it to the lowercase version and write it into the context as a key with value = 1. The sorting part on the first value given as input to context.write(..) is completely done by Hadoop, but I wouldn't be surprised if there is a way to influence it.
Similarly to the map part, the reduce class to use must extend org.apache.hadoop.mapreduce.Reducer and override reduce(..). Into reduce, values is the group having all the values that have as common word the value of key. We count all the values and return the couple <word, sum(values)>.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | import extensions.toIntWritable import org.apache.hadoop.io.IntWritable import org.apache.hadoop.io.Text import org.apache.hadoop.mapreduce.Reducer /** * Represents the Reducer class used for this example. * The generic types correspond to <InputKey, InputValue, OutputKey, OutputValue> */ class ReducerBigData : Reducer<Text, IntWritable, Text, IntWritable>() { override fun reduce(key: Text, values: Iterable<IntWritable>, context: Context) { // For each word returns the number of times it appeared in the input text. context.write(key, values.sumBy(IntWritable::get).toIntWritable()) } } |
This file, instead, contains a couple of Kotlin extension methods for Hadoop. I'm pretty much obsessed with this concept in the last period, so I couldn't not write some of them, who knows maybe I'll end up writing a Kotlin library for Hadoop!!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 | package extensions import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce.* import org.apache.hadoop.mapreduce.lib.input.FileInputFormat import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat fun Job.addPaths(inputPath: Path, outputPath: Path) { FileInputFormat.addInputPath(this, inputPath) FileOutputFormat.setOutputPath(this, outputPath) } inline fun <reified T : Any> Job.setJarByClass() { this.setJarByClass(T::class.java) } inline fun <reified T : InputFormat<*, *>> Job.setInputFormatClass() = { this.inputFormatClass = T::class.java } inline fun <reified T : OutputFormat<*, *>> Job.setOutputFormatClass() { this.outputFormatClass = T::class.java } inline fun <reified T : Mapper<*, *, *, *>> Job.setMapperClass() { this.mapperClass = T::class.java } inline fun <reified T : Reducer<*, *, *, *>> Job.setReducerClass(numReducers: Int) { this.reducerClass = T::class.java this.numReduceTasks = numReducers } inline fun <reified KeyClass : Any, reified ValueClass : Any> Job.mapOutput() { this.mapOutputKeyClass = KeyClass::class.java this.mapOutputValueClass = ValueClass::class.java } inline fun <reified KeyClass : Any, reified ValueClass : Any> Job.reducerOutput() { this.outputKeyClass = KeyClass::class.java this.outputValueClass = ValueClass::class.java } |
Last step, the driver class, the one that configures everything. I still have to dive deeply inside the Hadoop documentation, but I imagine is used mostly to set where to find the input file/files, where to save the results, the mapper and reducer configuration and extra stuff. Here all the extension methods for the Job class make everything more readable in my opinion.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 | import extensions.* import org.apache.hadoop.conf.Configured import org.apache.hadoop.fs.Path import org.apache.hadoop.io.IntWritable import org.apache.hadoop.io.Text import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.input.TextInputFormat import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat import org.apache.hadoop.util.Tool /** * Hello World for Big Data with Hadoop */ class DriverBigData : Configured(), Tool { override fun run(args: Array<out String>): Int { // parsing of the input parameters if (args.size != 3) { error("Expected <#reducers> <input_path> <output_path>, but received ${args.size} elements.") } val numReducers = args[0].toInt() val inputPath = Path(args[1]) val outputPath = Path(args[2]) // Defines a new job with(Job.getInstance(this.conf)) { jobName = "Big Data - Hello World" // Sets the path for input file/folder and the output folder addPaths(inputPath, outputPath) // Specifies the class of the Driver for this job setJarByClass<DriverBigData>() // Specifies the job's format for input and output setInputFormatClass<TextInputFormat>() setOutputFormatClass<TextOutputFormat<Text, IntWritable>>() // Specifies the mapper class and its key:value output setMapperClass<MapperBigData>() mapOutput<Text, IntWritable>() // Specifies the reducer class and its key:value output setReducerClass<ReducerBigData>(numReducers) reducerOutput<Text, IntWritable>() return if (waitForCompletion(true)) 0 else 1 } } } |
So, I think that this ends up this Hadoop Hello World and my first blog post. I hope everything makes sense and please, if you have questions or comments, feel free to write me!
No comments:
Post a Comment