Aggregating data is a fairly straight-forward task, but what if you are working with a distributed data set, one that does not fit in local memory?

In this post I am going to make use of key-value pairs and Apache-Spark’s `combineByKey` method to compute the average-by-key. Aggregating-by-key may seem like a trivial task, but it happens to play a major role in the implementation of algorithms such as KMeans, Naive Bayes, and TF-IDF. More importantly, implementing algorithms in a distributed framework such as Spark is an invaluable skill to have.

### Average By Key

The example below uses data in the form of a list of key-value tuples: `(key, value)`. I turn that list into a Resilient Distributed Dataset (RDD) with `sc.parallelize`, where `sc` is an instance of `pyspark.SparkContext`.

The next step is to use `combineByKey` to compute the sum and count for each key in `data`. Admittedly, using three lambda-functions as arguments to `combineByKey` makes the code difficult to read. I will explain each lambda-function in the next section. The result, `sumCount`, is an RDD where its values are in the form of `(label, (sum, count))`.

To compute the average-by-key, I use the `map` method to divide the sum by the count for each key.

Finally, I use the `collectAsMap` method to return the average-by-key as a dictionary.

``````
data = sc.parallelize( [(0, 2.), (0, 4.), (1, 0.), (1, 10.), (1, 20.)] )

sumCount = data.combineByKey(lambda value: (value, 1),
lambda x, value: (x + value, x + 1),
lambda x, y: (x + y, x + y))

averageByKey = sumCount.map(lambda (label, (value_sum, count)): (label, value_sum / count))

print averageByKey.collectAsMap()
``````

Result:

``````{0: 3.0, 1: 10.0}
``````

See here for the above example as an executable script.

### The combineByKey Method

In order to aggregate an RDD’s elements in parallel, Spark’s `combineByKey` method requires three functions:

• `createCombiner`
• `mergeValue`
• `mergeCombiner`

#### Create a Combiner

``````lambda value: (value, 1)
``````

The first required argument in the `combineByKey` method is a function to be used as the very first aggregation step for each key. The argument of this function corresponds to the value in a key-value pair. If we want to compute the sum and count using `combineByKey`, then we can create this “combiner” to be a tuple in the form of `(sum, count)`. The very first step in this aggregation is then `(value, 1)`, where `value` is the first RDD value that `combineByKey` comes across and `1` initializes the count.

#### Merge a Value

``````lambda x, value: (x + value, x + 1)
``````

The next required function tells `combineByKey` what to do when a combiner is given a new value. The arguments to this function are a combiner and a new value. The structure of the combiner is defined above as a tuple in the form of `(sum, count)` so we merge the new value by adding it to the first element of the tuple while incrementing `1` to the second element of the tuple.

#### Merge two Combiners

``````lambda x, y: (x + y, x + y)
``````

The final required function tells `combineByKey` how to merge two combiners. In this example with tuples as combiners in the form of `(sum, count)`, all we need to do is add the first and last elements together.

#### Compute the Average

``````averageByKey = sumCount.map(lambda (label, (value_sum, count)): (label, value_sum / count))
``````

Ultimately the goal is to compute the average-by-key. The result from `combineByKey` is an RDD with elements in the form `(label, (sum, count))`, so the average-by-key can easily be obtained by using the `map` method, mapping `(sum, count)` to `sum / count`.

Note: I do not use `sum` as variable name in the code because it is a built-in function in Python.