Spark combineByKey Explained with 5 Examples

The combineByKey function in Apache Spark is a powerful transformation used on RDDs of key-value pairs. It allows you to aggregate values by key efficiently by defining three functions:

  1. CreateCombiner – Initializes the first value for a key.
  2. MergeValue – Combines subsequent values within a partition.
  3. MergeCombiners – Merges partial results across partitions.

It is useful when performing custom aggregations where different data types are involved or when standard functions like reduceByKey and groupByKey are inefficient.


Example 1: Calculating Average per Key

In this example, we calculate the average value per key.

import org.apache.spark.{SparkConf, SparkContext}

val conf = new SparkConf().setAppName("CombineByKeyExample").setMaster("local[*]")
val sc = new SparkContext(conf)

val data = sc.parallelize(Seq(("A", 10), ("B", 20), ("A", 30), ("B", 40), ("A", 50)))

val combined = data.combineByKey(
  (value) => (value, 1),              // CreateCombiner: First value, count 1
  (acc: (Int, Int), value) => (acc._1 + value, acc._2 + 1), // MergeValue: Sum values and count
  (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2) // MergeCombiners: Sum partitions
).mapValues { case (sum, count) => sum.toDouble / count } // Calculate average

combined.collect().foreach(println)

Output:

(A, 30.0)
(B, 30.0)

Use Case: When computing averages efficiently without using groupByKey (which can cause memory issues).


Example 2: Finding Maximum Value per Key

val data = sc.parallelize(Seq(("X", 5), ("Y", 15), ("X", 25), ("Y", 10), ("X", 50)))

val maxPerKey = data.combineByKey(
  (value) => value,                // CreateCombiner: First value
  (maxValue: Int, value) => math.max(maxValue, value), // MergeValue: Get max value within partition
  (max1: Int, max2: Int) => math.max(max1, max2) // MergeCombiners: Get max across partitions
)

maxPerKey.collect().foreach(println)

Output:

(X, 50)
(Y, 15)

Use Case: When determining the maximum transaction amount per user.


Example 3: Concatenating Strings per Key

val data = sc.parallelize(Seq(("A", "apple"), ("B", "banana"), ("A", "avocado"), ("B", "blueberry")))

val combinedStrings = data.combineByKey(
  (value) => value,  
  (acc: String, value) => acc + ", " + value,
  (acc1: String, acc2: String) => acc1 + "; " + acc2
)

combinedStrings.collect().foreach(println)

Output:

(A, "apple, avocado")
(B, "banana, blueberry")

Use Case: Useful in text processing, like grouping all user comments by user ID.


Example 4: Counting Words per Key

val words = sc.parallelize(Seq(("spark", 1), ("scala", 1), ("spark", 1), ("scala", 1), ("java", 1)))

val wordCounts = words.combineByKey(
  (value) => value, 
  (count: Int, value) => count + value, 
  (count1: Int, count2: Int) => count1 + count2
)

wordCounts.collect().foreach(println)

Output:

(spark, 2)
(scala, 2)
(java, 1)

Use Case: Helps in word count problems for big data processing.


Example 5: Grouping Transactions Per Customer

val transactions = sc.parallelize(Seq(("C1", 100), ("C2", 200), ("C1", 300), ("C2", 400)))

val groupedTransactions = transactions.combineByKey(
  (value) => List(value),
  (acc: List[Int], value) => acc :+ value,
  (acc1: List[Int], acc2: List[Int]) => acc1 ++ acc2
)

groupedTransactions.collect().foreach(println)

Output:

(C1, List(100, 300))
(C2, List(200, 400))

Use Case: Useful in financial applications where transactions per customer need to be grouped.


When to Use combineByKey?

  • When different transformations (sum, count, etc.) need to be applied at different stages.
  • When handling large-scale key-value aggregations efficiently.
  • When reducing memory usage compared to groupByKey.

When NOT to Use combineByKey?

  • When simple aggregations like reduceByKey are sufficient.
  • When data fits in memory, and a groupByKey operation is feasible.

combineByKey is a flexible aggregation function in Spark, offering custom transformation capabilities per key. It is particularly useful when dealing with numeric calculations, list concatenation, or hierarchical data aggregation across partitions. 🚀