It has been a month now since I started learning Apache Spark, and it is easy to understand why it is the natural successor of Hadoop. First of all, Spark replaces the Hadoop MapReduce paradigm with Transformations and Actions on an RDD, (Resilient Distributed Dataset). As the name suggests, an RDD represents the dataset we want to work with, loaded in memory and distributed on multiple nodes of a cluster. Moreover, on an RDD, it is possible to perform multiple transformations, in order to modify and transform its content, with the use of actions for collecting the results of those transformations. While most of the transformations and actions in Spark can be implemented using the MapReduce paradigm in Hadoop, Spark improves two well-known limitations of the latter: the coding experience and the general performances. The former is solved thanks to the presence of multiple transformations and actions that they no longer require their manual coding in the form of a Mapper or a Reducer. The latter are, instead, improved with the use of main memory for all the operations performed, avoiding the writing and reading from disk needed during multiple MapReduce jobs. Moreover, the loading in memory of an RDD and the transformations performed on it, are lazily evaluated. This means that, until an action is called on the RDD, nothing is performed from a computational point of view. In this way, it is also possible, behind the scenes, to optimize those transformations.
In its blog post, Thomas shows how it is possible to create an Apache Spark project and use its Java API with Kotlin. In this blog post, I want to extend the discussion, showing why, in my opinion, Kotlin should be preferred over Java, with some examples on how to improve the Java API and the general user experience with Spark. You can find all the code related to this post here, where there's also a brief explanation of the exercise solved.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import ... | |
operator fun <T> Array<T>.get(indices: IntRange) = this.sliceArray(indices) | |
fun main(args: Array<String>) { | |
if (args.size != 6) { | |
error("Expected <iStations> <iNeighbors> <threshold> <output1> <output2> <output3>") | |
} | |
val (inputStations, inputNeighbors) = args[0..1] | |
val threshold = args[2].toDouble() | |
val (outputPart1, outputPart2, outputPart3) = args[3..5] | |
with(SparkConf()) { | |
setMaster("local") | |
setAppName("Critical Stations Analysis - Exercise 43") | |
JavaSparkContext(this).use { | |
val cachedInputStations = it.textFile(inputStations).cache() | |
// Part 1 | |
cachedInputStations.mapToPair { | |
val values = it.split(",") | |
values[0] tuple (1.0 to if (values.last().toDouble() <= threshold) 1.0 else 0.0) | |
}.reduceByKey { pair1, pair2 -> | |
(pair1.first + pair2.first) to (pair1.second + pair2.second) | |
}.mapValues { (total, critical) -> critical / total } | |
.filter { it._2 >= 0.8 } | |
.mapToPair { it._2 tuple it._1 } | |
.sortByKey(false) | |
.saveAsTextFile(outputPart1) | |
// Part 2 | |
cachedInputStations.mapToPair { | |
val values = it.split(",") | |
val hour = values[2].toInt() | |
val timeslot = (hour - hour % 4) to (hour + 3 - hour % 4) | |
val value1 = (timeslot to values[0]) | |
value1 tuple (1.0 to if (values.last().toDouble() <= threshold) 1.0 else 0.0) | |
}.reduceByKey { pair1, pair2 -> | |
(pair1.first + pair2.first) to (pair1.second + pair2.second) | |
}.mapToPair { | |
val (total, critical) = it._2 | |
(critical / total) tuple it._1 | |
}.sortByKey(false) | |
.saveAsTextFile(outputPart2) | |
// Part 3 | |
val neighborsMap = it.textFile(inputNeighbors) | |
.mapToPair { | |
val values = it.split(",") | |
values[0] tuple values[1].split(" ") | |
}.collectAsImmutableMap() | |
val totalValidLines = it.sc().longAccumulator() | |
cachedInputStations.filter { it.split(",").last().toInt() == 0 } | |
.mapToPair { | |
val values = it.split(",") | |
"${values[1]}:${values[2]}:${values[3]}" tuple it | |
}.groupByKey() | |
.flatMap { | |
val readings = it._2 | |
readings.filter { read -> | |
val sID = read.split(",").first() | |
val neighbors = neighborsMap.getOrDefault(sID, listOf()) | |
val validReads = readings.filter { | |
neighbors.contains(it.split(",").first()) | |
} | |
if (validReads.size == neighbors.size) { | |
totalValidLines.add(neighbors.size.toLong()) | |
true | |
} else false | |
}.iterator() | |
}.saveAsTextFile(outputPart3) | |
println("Number of valid lines is ${totalValidLines.count()}") | |
} | |
} | |
} |
Extension methods are another point in favor of Kotlin. Line 25 and 54 show two ways of improving the Java API, making them more Kotlinesque. As shown in the code example below, the first extension is used to make the Scala.Tuple2 creation similar to the Kotlin.Pair creation, using the infix notation. I decided to use tuple instead of to only for avoiding confusion between the two methods, but the name must be absolutely improved!
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import scala.Tuple2 | |
infix fun <K, V> K.tuple(value: V) = Tuple2(this, value) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.spark.api.java.JavaPairRDD | |
fun <K, V> JavaPairRDD<K, V>.collectAsImmutableMap() = this.collectAsMap().toMap() |
Conclusions
Apache Spark is written in Scala, and it is obvious that the latter will probably be the best way to write Spark code. However, I think that at this point, Kotlin can be considered a better choice than Java in using Spark, considering also how the community is going to grow in the next couple of years, thanks to Kotlin Native and the Android support.
This is just a brief overview of how it is possible to improve the Java experience of Apache Spark using Kotlin and I'm planning to write more about this subject. If you have any question, suggestion or criticism, don't hesitate to comment here or contact me.