Using combineByKey in Apache-Spark
Aggregating data is a fairly straight-forward task, but what if you are working with a distributed data set, one that does not fit in local memory?
In this post I am going to make use of key-value pairs and Apache-Spark’s combineByKey
method to compute the average-by-key. Aggregating-by-key may seem like a trivial task, but it happens to play a major role in the implementation of algorithms such as KMeans, Naive Bayes, and TF-IDF. More importantly, implementing algorithms in a distributed framework such as Spark is an invaluable skill to have.
Average By Key
The example below uses data in the form of a list of key-value tuples: (key, value)
. I turn that list into a Resilient Distributed Dataset (RDD) with sc.parallelize
, where sc
is an instance of pyspark.SparkContext
.
The next step is to use combineByKey
to compute the sum and count for each key in data
. Admittedly, using three lambda-functions as arguments to combineByKey
makes the code difficult to read. I will explain each lambda-function in the next section. The result, sumCount
, is an RDD where its values are in the form of (label, (sum, count))
.
To compute the average-by-key, I use the map
method to divide the sum by the count for each key.
Finally, I use the collectAsMap
method to return the average-by-key as a dictionary.
data = sc.parallelize( [(0, 2.), (0, 4.), (1, 0.), (1, 10.), (1, 20.)] )
sumCount = data.combineByKey(lambda value: (value, 1),
lambda x, value: (x[0] + value, x[1] + 1),
lambda x, y: (x[0] + y[0], x[1] + y[1]))
averageByKey = sumCount.map(lambda (label, (value_sum, count)): (label, value_sum / count))
print averageByKey.collectAsMap()
Result:
{0: 3.0, 1: 10.0}
See here for the above example as an executable script.
The combineByKey Method
In order to aggregate an RDD’s elements in parallel, Spark’s combineByKey
method requires three functions:
createCombiner
mergeValue
mergeCombiner
Create a Combiner
lambda value: (value, 1)
The first required argument in the combineByKey
method is a function to be used as the very first aggregation step for each key. The argument of this function corresponds to the value in a key-value pair. If we want to compute the sum and count using combineByKey
, then we can create this “combiner” to be a tuple in the form of (sum, count)
. The very first step in this aggregation is then (value, 1)
, where value
is the first RDD value that combineByKey
comes across and 1
initializes the count.
Merge a Value
lambda x, value: (x[0] + value, x[1] + 1)
The next required function tells combineByKey
what to do when a combiner is given a new value. The arguments to this function are a combiner and a new value. The structure of the combiner is defined above as a tuple in the form of (sum, count)
so we merge the new value by adding it to the first element of the tuple while incrementing 1
to the second element of the tuple.
Merge two Combiners
lambda x, y: (x[0] + y[0], x[1] + y[1])
The final required function tells combineByKey
how to merge two combiners. In this example with tuples as combiners in the form of (sum, count)
, all we need to do is add the first and last elements together.
Compute the Average
averageByKey = sumCount.map(lambda (label, (value_sum, count)): (label, value_sum / count))
Ultimately the goal is to compute the average-by-key. The result from combineByKey
is an RDD with elements in the form (label, (sum, count))
, so the average-by-key can easily be obtained by using the map
method, mapping (sum, count)
to sum / count
.
Note: I do not use sum
as variable name in the code because it is a built-in function in Python.
Learn More
To learn more about Spark and programming with key-value pairs in Spark, see:
- Spark Documentation Overview
- Spark Programming Guide
- O’Reilly: Learning Spark, Chapter 4
- PySpark Documentation
For an example of using the above calculation in a PySpark implementation of KMeans, see: