In this post, I'm going to show some basic options that Hadoop offers for improving a MapReduce job, as always using Kotlin as programming language. This time, the example used is the analysis of sensors generated data, available in two different formats. Moreover, we will produce two different kind of outputs, one with sensors data that reported a temperature over a certain threshold, and the other where the temperature is under that threshold. As always, all the code is available in this repository.
For summing up, these are the concepts presented in this post:
- Map-only Jobs.
- How to handle Multiple Inputs and Multiple Outputs in Hadoop.
- The setup() and the cleanup() method of a Mapper.
- How to define personalized datatypes as Mapper/Reducer Values.
Let's start with the first two points. A Map-only Job is a MapReduce program where the Reduce phase is missing, and the output of the job is the one from the Map phase. In order to have a Map-only Job, is enough to set the number of reducers to 0, like in line 27 of SensorsDriver.kt.
Handling multiple inputs is quite simple; the extension method addMultipleInputPath() accepts the input path of a file, or a folder, and the types of InputFormat and Mapper to handle this input.Multiple outputs are, instead, a bit more complicated. They don't represent a different folder where to put the output files but the prefix to use for defining a certain kind of output. First, it is necessary to call the extension method addMultipleNamedOutput() to set the prefix of the output file to create, the type of OutputFormat to use and its Key and Value types (look at line 31 and 32 of SensorsDriver.kt).
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package common.hadoop.extensions | |
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs | |
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs | |
inline fun <reified T : InputFormat<*, *>, reified K : Mapper<*, *, *, *>> Job.addMultipleInputPath(path: Path) { | |
MultipleInputs.addInputPath(this, path, T::class.java, K::class.java) | |
} | |
inline fun <reified T : OutputFormat<Key, Value>, reified Key : Any, reified Value : Any> Job.addMultipleNamedOutput(namedOutput: String) { | |
MultipleOutputs.addNamedOutput(this, namedOutput, T::class.java, Key::class.java, Value::class.java) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import ... | |
val HIGH_TEMP = "hightemp" | |
val LOW_TEMP = "lowtemp" | |
val THRESHOLD = "threshold" | |
class SensorsDriver : Configured(), Tool { | |
override fun run(args: Array<out String>): Int { | |
if (args.size != 4) { | |
error("Expected <input_path1> <input_path2> <output_path> <temp_threshold>, but received " + | |
"${args.size} elements.") | |
} | |
val firstInput = Path(args[0]) | |
val secInput = Path(args[1]) | |
val outputPath = Path(args[2]) | |
val highTemp = args[3] | |
val config = this.conf.apply { | |
set(THRESHOLD, highTemp) | |
} | |
with(Job.getInstance(config)) { | |
jobName = "Big Data - Sensors Analysis" | |
setJarByClass<SensorsDriver>() | |
addMultipleInputPath<KeyValueTextInputFormat, SensorsMapperType1>(firstInput) | |
addMultipleInputPath<TextInputFormat, SensorsMapperType2>(secInput) | |
addOutputPath(outputPath) | |
addMultipleNamedOutput<TextOutputFormat<Text, SensorData>, Text, SensorData>(HIGH_TEMP) | |
addMultipleNamedOutput<TextOutputFormat<Text, SensorData>, Text, SensorData>(LOW_TEMP) | |
numReduceTasks = 0 | |
return if (waitForCompletion(true)) 0 else 1 | |
} | |
} | |
} |
The MultipleOutputs.write() method works exactly like the Context.write() method used in previous examples, except that this time it requires, also, the prefix of the output file where to write.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import ... | |
/* | |
Input format is of type: | |
<s#id>\t<date>,<temp> | |
*/ | |
class SensorsMapperType1: Mapper<Text, Text, Text, SensorData>() { | |
lateinit var multipleOutputs: MultipleOutputs<Text, SensorData> | |
override fun setup(context: Context) { | |
multipleOutputs = MultipleOutputs(context) | |
} | |
override fun map(key: Text, value: Text, context: Context) { | |
val values = value.split(",") | |
val date = values[0] | |
val temp = values[1].toFloat() | |
val sensorData = SensorData(date, temp) | |
val threshold = context.configuration.get(THRESHOLD).toFloat() | |
if (temp >= threshold) { | |
multipleOutputs.write(HIGH_TEMP, key, sensorData) | |
} else { | |
multipleOutputs.write(LOW_TEMP, key, sensorData) | |
} | |
} | |
override fun cleanup(context: Context?) { | |
multipleOutputs.close() | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import ... | |
/* | |
Input format is of type: | |
<date>,<temp>,<s#id> | |
*/ | |
class SensorsMapperType2 : Mapper<LongWritable, Text, Text, SensorData>() { | |
lateinit var multipleOutputs: MultipleOutputs<Text, SensorData> | |
override fun setup(context: Context) { | |
multipleOutputs = MultipleOutputs(context) | |
} | |
override fun map(key: LongWritable, value: Text, context: Context) { | |
val values = value.split(",") | |
val sensorID = values[2] | |
val date = values[0] | |
val temp = values[1].toFloat() | |
val sensorData = SensorData(date, temp) | |
val threshold = context.configuration.get(THRESHOLD).toFloat() | |
if (temp >= threshold) { | |
multipleOutputs.write(HIGH_TEMP, sensorID.toText(), sensorData) | |
} else { | |
multipleOutputs.write(LOW_TEMP, sensorID.toText(), sensorData) | |
} | |
} | |
override fun cleanup(context: Context) { | |
multipleOutputs.close() | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.hadoop.io.Writable | |
import java.io.DataInput | |
import java.io.DataOutput | |
data class SensorData(var date: String = "", var temp: Float = 0.0F) : Writable { | |
override fun readFields(dataIn: DataInput) { | |
date = dataIn.readUTF() | |
temp = dataIn.readFloat() | |
} | |
override fun write(dataOut: DataOutput) { | |
dataOut.writeUTF(date) | |
dataOut.writeFloat(temp) | |
} | |
} |
Before ending this post, I want to present some thoughts that I have in using Kotlin for this example:
- I was hoping to use addMultipleNamedOutput<TextOutputFormat, Text, SensorData> but it isn't allowed by Kotlin. If you are interested, I asked on stackoverflow why I can't do that.
- Kotlin data classes are good candidates for implementing the Writable and WritableComparable interfaces, making available for free the methods equals()/hashcode(), toString() and copy(). The only limitation is related to the fact that the attributes can't be immutable. The obvious reason for this is the fact that the real object instantiation is done by the readFields method, making not possible to have val attributes. However, the advantage in having free methods implementation in respect to losing immutability, makes them, in my opinion, preferable over traditional classes.
With these conclusions, I end my third blog post about Hadoop using Kotlin. As always, if you have suggestions or criticisms, don't esitate to comment here or to write me in private.
No comments:
Post a Comment