Tuesday, May 30, 2017

Let's make Apache Spark more Kotlinesque

I read Thomas Nield's blog post about Kotlin and Apache Spark around the end of last year. At the time, I had a basic knowledge of Kotlin and I just knew what Apache Spark was, but no idea on how to use it. That post convinced me to start digging a bit more into the Big Data world and, more importantly, to start writing about Big Data using Kotlin. So, if you're reading, thank you, Thomas!!

It has been a month now since I started learning Apache Spark, and it is easy to understand why it is the natural successor of Hadoop. First of all, Spark replaces the Hadoop MapReduce paradigm with Transformations and Actions on an RDD, (Resilient Distributed Dataset). As the name suggests, an RDD represents the dataset we want to work with, loaded in memory and distributed on multiple nodes of a cluster. Moreover, on an RDD, it is possible to perform multiple transformations, in order to modify and transform its content, with the use of actions for collecting the results of those transformations. While most of the transformations and actions in Spark can be implemented using the MapReduce paradigm in Hadoop, Spark improves two well-known limitations of the latter: the coding experience and the general performances. The former is solved thanks to the presence of multiple transformations and actions that they no longer require their manual coding in the form of a Mapper or a Reducer. The latter are, instead, improved with the use of main memory for all the operations performed, avoiding the writing and reading from disk needed during multiple MapReduce jobs. Moreover, the loading in memory of an RDD and the transformations performed on it, are lazily evaluated. This means that, until an action is called on the RDD, nothing is performed from a computational point of view. In this way, it is also possible, behind the scenes, to optimize those transformations.

In its blog post, Thomas shows how it is possible to create an Apache Spark project and use its Java API with Kotlin. In this blog post, I want to extend the discussion, showing why, in my opinion, Kotlin should be preferred over Java, with some examples on how to improve the Java API and the general user experience with Spark. You can find all the code related to this post here, where there's also a brief explanation of the exercise solved.

First of all, the possibility to use lambda expressions instead of anonymous classes significantly improve code readability. Even if lambdas are present in Java 8 too, I believe that the possibility in Kotlin to use it instead of declaring the lambda parameter, but also parameter destructuring (line 28) introduced in Kotlin 1.1, makes the general experience of using lambdas a bit more enjoyable.

Extension methods are another point in favor of Kotlin. Line 25 and 54 show two ways of improving the Java API, making them more Kotlinesque. As shown in the code example below, the first extension is used to make the Scala.Tuple2 creation similar to the Kotlin.Pair creation, using the infix notation. I decided to use tuple instead of to only for avoiding confusion between the two methods, but the name must be absolutely improved!

The second extension is, instead, a typical Kotlin extension method, creating a Spark-like method that, instead of returning a Java mutable map, embraces the immutability collections present in the Kotlin standard library.

And talking about extension methods and standard library, line 15 and 19 show two methods already implemented in Kotlin. The former one is with(), allowing to reference the SparkConf instance with the this keyword where necessary, while the latter is use(), the Kotlin version of the Java try-with-resources, used with the JavaSparkContext instance.

Conclusions
Apache Spark is written in Scala, and it is obvious that the latter will probably be the best way to write Spark code. However, I think that at this point, Kotlin can be considered a better choice than Java in using Spark, considering also how the community is going to grow in the next couple of years, thanks to Kotlin Native and the Android support.
This is just a brief overview of how it is possible to improve the Java experience of Apache Spark using Kotlin and I'm planning to write more about this subject. If you have any question, suggestion or criticism, don't hesitate to comment here or contact me.

Monday, April 17, 2017

Multiple Inputs, Outputs and Personalized Datatype in Hadoop

Wow, nearly two months since my last post!! My thesis needed more time then I excepted, but now it's over, so I'll probably have more time to dedicate to the blog (hopefully).

In this post, I'm going to show some basic options that Hadoop offers for improving a MapReduce job, as always using Kotlin as programming language. This time, the example used is the analysis of sensors generated data, available in two different formats. Moreover, we will produce two different kind of outputs, one with sensors data that reported a temperature over a certain threshold, and the other where the temperature is under that threshold. As always, all the code is available in this repository.

For summing up, these are the concepts presented in this post:
  1. Map-only Jobs.
  2. How to handle Multiple Inputs and Multiple Outputs in Hadoop.
  3. The setup() and the cleanup() method of a Mapper.
  4. How to define personalized datatypes as Mapper/Reducer Values.
Let's start with the first two points. A Map-only Job is a MapReduce program where the Reduce phase is missing, and the output of the job is the one from the Map phase. In order to have a Map-only Job, is enough to set the number of reducers to 0, like in line 27 of SensorsDriver.kt.
Handling multiple inputs is quite simple; the extension method addMultipleInputPath() accepts the input path of a file, or a folder, and the types of InputFormat and Mapper to handle this input.
Multiple outputs are, instead, a bit more complicated. They don't represent a different folder where to put the output files but the prefix to use for defining a certain kind of output. First, it is necessary to call the extension method addMultipleNamedOutput() to set the prefix of the output file to create, the type of OutputFormat to use and its Key and Value types (look at line 31 and 32 of SensorsDriver.kt).

SensorsMapperType1 and SensorsMapperType2 are the two mappers used for handling the two types of sensors data expected. Point 3 is connected to the Multiple Outputs concept of point 2. Specifically, what is important is that the setup() and cleanup() method are called exactly once for each mapper. The first method is called after the mapper instantiation, while the second one is called after the mapper job is completed. What makes this methods special, is that, for both of them, an instance of Context is available when they are called. In our example, the setup method is used for creating an instance of MultipleOutputs for writing in the two different file formats defined in the Driver class, while the cleanup() method is used for closing the outputs stream.
The MultipleOutputs.write() method works exactly like the Context.write() method used in previous examples, except that this time it requires, also, the prefix of the output file where to write.

SensorData addresses point #4. In order to define serializable classes that can be used by an Hadoop program, one of these two interfaces must be implemented: Writable and WritableComparable. The only difference between the two is that the second allows to define custom classes as keys, requiring them to be comparable. What is important is that the methods readFields(), for deserialization, and write(), for serialization, are correctly implemented, in particular, is important that the order in which the data are written, serialized, is the same in which are read, deserialized.

Conclusions
Before ending this post, I want to present some thoughts that I have in using Kotlin for this example:
  • I was hoping to use addMultipleNamedOutput<TextOutputFormat, Text, SensorData> but it isn't allowed by Kotlin. If you are interested, I asked on stackoverflow why I can't do that.
  • Kotlin data classes are good candidates for implementing the Writable and WritableComparable interfaces, making available for free the methods equals()/hashcode(), toString() and copy(). The only limitation is related to the fact that the attributes can't be immutable. The obvious reason for this is the fact that the real object instantiation is done by the readFields method, making not possible to have val attributes. However, the advantage in having free methods implementation in respect to losing immutability, makes them, in my opinion, preferable over traditional classes.

With these conclusions, I end my third blog post about Hadoop using Kotlin. As always, if you have suggestions or criticisms, don't esitate to comment here or to write me in private.

Saturday, February 25, 2017

Combiners, Counters and Properties in Hadoop

Before starting, I would like to thank everyone that spent her/his time reading my first blog post about Kotlin and Hadoop, and, in particular, I would like to thank the guys @KotlinWeekly for referencing it in the newsletter #29. If you haven't signed yet, go to http://www.kotlinweekly.net/  and register to their newsletter, you can find amazing articles about Kotlin every week.
I've also decided to introduce Kotlin 1.1RC in the project for starting learning what is new in this version. As always, you can find the code of this post in its repository.

In this post, I'm going to introduce the Hadoop concepts of a Combiner but also how to define custom Counters and Properties in a MapReduce program. The example this time is the n-gram counter problem with a keyword for filtering the results. Given as input one or more text files, for each line we create multiple sequences of n or less words starting from the first one. We then filter out all the n-grams that start with the input keyword and we count how many times each sequence appears in the inputs.

Let's start with the easy one, Properties and Counters. A property is an attribute of the program that we want to share in our working context, and is usually a read-only property that we don't have interest in changing during the execution. In our example, we will use them to save the value n of our n-grams and the keyword to use in the filtering.
Instead, a counter is some sort of statistics we want to keep track of during the running of our program. In this case, we want to keep track of how many n-grams we filter out because they start with our keyword and we will increment the counter by 1 every time we discard one of them.

This is our Driver code, and lines 30 and 31 is where we define the properties for our Configuration. Because a property is a simple pair, the method Configuration.set(key, value) allows us to set a property easily, while Configuration.get(key) gives the value back as a String, as shown in line 15 of the NGramMapper.

The Counter code is, instead, between lines 13 and 15. In order to create a counter, it is enough to declare a public enum class inside the Driver class. This counter starts from 0 and it is incremented in the Combiner and the Reducer classes. For improving the readability of incrementing a counter, I have implemented a couple of extension methods for the Counter class. The operator keyword in Kotlin allows to extend a class with the methods inc() and plusAssign (value) and to call them as counter++ and counter += value, making the use of a counter more intuitive.

A Combiner places itself between the Mapper and the Reducer execution. The idea is that, in order to reduce the amount of data that the Mapper sends in the network, it is possible to combine these data in some way while they are still in memory, making possible to reduce the amount of them sent to the Reducer. A Combiner, like the Reducer, extends the org.hadoop.mapreduce.Reducer class and its input key:value and output key:value are the same of the reducer. Considering our example, instead of sending multiple times the same n-gram, or sending a n-gram that is going to be filtered out, we can first combine those results in the Combiner phase, reducing the amount of data in the network by counting a priori the number of times a n-gram appears, or immediately filter out those n-grams that match our keyword.

Two things are important to know before using a Combiner: first, the function executed must be commutative and associative in order to be able to use the Combiner correctly. And second, the execution of the Combiner is not guaranteed for all the keys generated from the Mapper, so it is important to consider this when writing the Reducer's code. For those two reasons the input and output of a Combiner must be the same of the Reducer used.

[NB] I'm not really sure about this second thing, so if somebody can give me more info about this, it would be incredibly appreciated.
[NB.2] The Combiner and the Reducer code are exactly the same, but for the sake of making everything more readable I decided to write two separate classes.

At the end of the execution, the value of the counter can be printed on the screen in the Driver class, but it is easier to simply look at the output of our Hadoop execution, which will print the counter result as shown in the image below.


With this ends my second post about Hadoop using Kotlin. If you have any question, or if you see that something is wrong in my explanation, don't hesitate to write me.

Wednesday, February 8, 2017

Hello Blog World - Learning Big Data with Kotlin

Hello Blog World!! I'm planning to use this blog as a diary of what I'm learning about Big Data and programming in general, but whoever is reading now, please feel free to contribute with comments, critics and everything that comes to your mind. So, let's start with the presentation: my name is Guido, I'm finishing my master studies in Software Engineering but currently I'm working at the Linköping University in the bioinformatics field.
In my Big Data learning schedule, the first technology is Hadoop, so I would start with a classical Hello World exercise for it, but written in Kotlin. By the way, all the code from this post can be found in its GitHub repository.

The purpose of this first exercise is pretty simple, developing a MapReduce algorithm that, given as input a text file, generates, as a result, a file with, for each word, the number of times is present in the file. First of all, what is MapReduce? Wikipedia says that "MapReduce is a programming model and an associated implementation for processing and generating large data sets with a parallel, distributed algorithm on a cluster". Made it simple from a programming point of view, it is just calling two functions, Map() for translating the input into something else and then sorting that, and Reduce() for doing a summary of what happened in the previous step.
For our exercise, this MapReduce context is pretty easy: we map each word into a tuple <word, 1>, because each word counts as 1, we sort them in groups that have the same word as key, and then we reduce each group by counting the number of 1 in them.

First the project's settings. I'm using a build.gradle to configure the entire project...

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
version '1.0'

buildscript {
    ext.kotlin_version = '1.0.6'
    ext.hadoop_version = '2.7.3'

    repositories {
        mavenCentral()
    }
    dependencies {
        classpath "org.jetbrains.kotlin:kotlin-gradle-plugin:$kotlin_version"
    }
}

allprojects {
    apply plugin: 'kotlin'
    apply plugin: 'java'

    sourceCompatibility = 1.7

    repositories {
        mavenCentral()
    }

    configurations {
        provide
        compile.extendsFrom provide
    }

    dependencies {
        provide "org.jetbrains.kotlin:kotlin-stdlib:$kotlin_version"
        testCompile group: 'junit', name: 'junit', version: '4.12'
    }

    jar {
        from {
            // in this way the kotlin.stdlib is added to the jar file
            configurations.provide.collect {
                it.isDirectory() ? it : zipTree(it)
            }
        }
    }

}

... and another build.gradle for the specific exercise.

1
2
3
dependencies {
    compile "org.apache.hadoop:hadoop-client:$hadoop_version"
}

They are quite standard gradle configuration files, the only important thing to notice is the jar task at line 35 in the first one. This is necessary for including the kotlin-stdlib inside the jar generated, in order to make possible to use Hadoop for running the example. I haven't found any better way to do it, if anybody has any advice about this, it would be really helpful.

Now, the mapper code. It must be a class that extends org.apache.hadoop.mapreduce.Mapper and override the map(..) method.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
import extensions.split
import extensions.toIntWritable
import extensions.toText
import org.apache.hadoop.io.IntWritable
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapreduce.Mapper
import java.util.regex.Pattern

/**
 * Represents the Mapper class used for this example.
 * The generic types correspond to <InputKey, InputValue, OutputKey, OutputValue>
 */
class MapperBigData : Mapper<LongWritable, Text, Text, IntWritable>() {

 override fun map(key: LongWritable, value: Text, context: Context) {
  // Splits each sentence in a list of words.
  val words = value.split(Pattern.compile("\\W+"))
  words.map(String::toLowerCase)
    .forEach { context.write(it.toText(), 1.toIntWritable()) }
 }
}

The input text is inside value, so we divided it into words, and for each word, we map it to the lowercase version and write it into the context as a key with value = 1. The sorting part on the first value given as input to context.write(..) is completely done by Hadoop, but I wouldn't be surprised if there is a way to influence it.
Similarly to the map part, the reduce class to use must extend org.apache.hadoop.mapreduce.Reducer and override reduce(..). Into reduce, values is the group having all the values that have as common word the value of key. We count all the values and return the couple <word, sum(values)>.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
import extensions.toIntWritable
import org.apache.hadoop.io.IntWritable
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapreduce.Reducer

/**
 * Represents the Reducer class used for this example.
 * The generic types correspond to <InputKey, InputValue, OutputKey, OutputValue>
 */
class ReducerBigData : Reducer<Text, IntWritable, Text, IntWritable>() {

 override fun reduce(key: Text, values: Iterable<IntWritable>, context: Context) {
  // For each word returns the number of times it appeared in the input text.
  context.write(key, values.sumBy(IntWritable::get).toIntWritable())
 }
}

This file, instead, contains a couple of Kotlin extension methods for Hadoop. I'm pretty much obsessed with this concept in the last period, so I couldn't not write some of them, who knows maybe I'll end up writing a Kotlin library for Hadoop!!

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package extensions

import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.*
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat

fun Job.addPaths(inputPath: Path, outputPath: Path) {
 FileInputFormat.addInputPath(this, inputPath)
 FileOutputFormat.setOutputPath(this, outputPath)
}

inline fun <reified T : Any> Job.setJarByClass() {
 this.setJarByClass(T::class.java)
}

inline fun <reified T : InputFormat<*, *>> Job.setInputFormatClass() = {
 this.inputFormatClass = T::class.java
}

inline fun <reified T : OutputFormat<*, *>> Job.setOutputFormatClass() {
 this.outputFormatClass = T::class.java
}

inline fun <reified T : Mapper<*, *, *, *>> Job.setMapperClass() {
 this.mapperClass = T::class.java
}

inline fun <reified T : Reducer<*, *, *, *>> Job.setReducerClass(numReducers: Int) {
 this.reducerClass = T::class.java
 this.numReduceTasks = numReducers
}

inline fun <reified KeyClass : Any, reified ValueClass : Any> Job.mapOutput() {
 this.mapOutputKeyClass = KeyClass::class.java
 this.mapOutputValueClass = ValueClass::class.java
}

inline fun <reified KeyClass : Any, reified ValueClass : Any> Job.reducerOutput() {
 this.outputKeyClass = KeyClass::class.java
 this.outputValueClass = ValueClass::class.java
}

Last step, the driver class, the one that configures everything. I still have to dive deeply inside the Hadoop documentation, but I imagine is used mostly to set where to find the input file/files, where to save the results, the mapper and reducer configuration and extra stuff. Here all the extension methods for the Job class make everything more readable in my opinion.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import extensions.*
import org.apache.hadoop.conf.Configured
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.IntWritable
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
import org.apache.hadoop.util.Tool

/**
 * Hello World for Big Data with Hadoop
 */

class DriverBigData : Configured(), Tool {
 override fun run(args: Array<out String>): Int {
  // parsing of the input parameters
  if (args.size != 3) {
   error("Expected <#reducers> <input_path> <output_path>, but received ${args.size} elements.")
  }
  val numReducers = args[0].toInt()
  val inputPath = Path(args[1])
  val outputPath = Path(args[2])

  // Defines a new job
  with(Job.getInstance(this.conf)) {
   jobName = "Big Data - Hello World"

   // Sets the path for input file/folder and the output folder
   addPaths(inputPath, outputPath)

   // Specifies the class of the Driver for this job
   setJarByClass<DriverBigData>()

   // Specifies the job's format for input and output
   setInputFormatClass<TextInputFormat>()
   setOutputFormatClass<TextOutputFormat<Text, IntWritable>>()

   // Specifies the mapper class and its key:value output
   setMapperClass<MapperBigData>()
   mapOutput<Text, IntWritable>()

   // Specifies the reducer class and its key:value output
   setReducerClass<ReducerBigData>(numReducers)
   reducerOutput<Text, IntWritable>()

   return if (waitForCompletion(true)) 0 else 1
  }
 }
}

So, I think that this ends up this Hadoop Hello World and my first blog post. I hope everything makes sense and please, if you have questions or comments, feel free to write me!