Apache Spark
- Apache Spark: Big Data Processing & Analytics
- Spark DataFrames: Features, Use Cases & Optimization for Big Data
- Spark Architecture
- Dataframe create from file
- Dataframe Pyspark create from collections
- Spark Dataframe save as csv
- Dataframe save as parquet
- Dataframe show() between take() methods
- Apache SparkSession
- Understanding the RDD of Apache Spark
- Spark RDD creation from collection
- Different method to print data from rdd
- Practical use of unionByName method
- Creating Spark DataFrames: Methods & Examples
- Setup Spark in PyCharm
- Apache Spark all APIs
- Spark for the word count program
- Spark Accumulators
- aggregateByKey in Apache Spark
- Spark Broadcast with Examples
- Spark combineByKey
- Apache Spark Using countByKey
- Spark CrossJoin know all
- Optimizing Spark groupByKey: Usage, Best Practices, and Examples
- Mastering Spark Joins: Inner, Outer, Left, Right & Semi Joins Explained
- Apache Spark: Local Mode vs Cluster Mode - Key Differences & Examples
- Spark map vs flatMap: Key Differences with Examples
- Efficient Data Processing with Spark mapPartitionsWithIndex
- Spark reduceByKey with 5 Real-World Examples
- Spark Union vs UnionAll vs Union Available – Key Differences & Examples
Spark combineByKey
Explained with 5 Examples
The combineByKey
function in Apache Spark is a powerful transformation used on RDDs of key-value pairs. It allows you to aggregate values by key efficiently by defining three functions:
- CreateCombiner – Initializes the first value for a key.
- MergeValue – Combines subsequent values within a partition.
- MergeCombiners – Merges partial results across partitions.
It is useful when performing custom aggregations where different data types are involved or when standard functions like reduceByKey
and groupByKey
are inefficient.
Example 1: Calculating Average per Key
In this example, we calculate the average value per key.
import org.apache.spark.{SparkConf, SparkContext}
val conf = new SparkConf().setAppName("CombineByKeyExample").setMaster("local[*]")
val sc = new SparkContext(conf)
val data = sc.parallelize(Seq(("A", 10), ("B", 20), ("A", 30), ("B", 40), ("A", 50)))
val combined = data.combineByKey(
(value) => (value, 1), // CreateCombiner: First value, count 1
(acc: (Int, Int), value) => (acc._1 + value, acc._2 + 1), // MergeValue: Sum values and count
(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2) // MergeCombiners: Sum partitions
).mapValues { case (sum, count) => sum.toDouble / count } // Calculate average
combined.collect().foreach(println)
Output:
(A, 30.0)
(B, 30.0)
Use Case: When computing averages efficiently without using groupByKey
(which can cause memory issues).
Example 2: Finding Maximum Value per Key
val data = sc.parallelize(Seq(("X", 5), ("Y", 15), ("X", 25), ("Y", 10), ("X", 50)))
val maxPerKey = data.combineByKey(
(value) => value, // CreateCombiner: First value
(maxValue: Int, value) => math.max(maxValue, value), // MergeValue: Get max value within partition
(max1: Int, max2: Int) => math.max(max1, max2) // MergeCombiners: Get max across partitions
)
maxPerKey.collect().foreach(println)
Output:
(X, 50)
(Y, 15)
Use Case: When determining the maximum transaction amount per user.
Example 3: Concatenating Strings per Key
val data = sc.parallelize(Seq(("A", "apple"), ("B", "banana"), ("A", "avocado"), ("B", "blueberry")))
val combinedStrings = data.combineByKey(
(value) => value,
(acc: String, value) => acc + ", " + value,
(acc1: String, acc2: String) => acc1 + "; " + acc2
)
combinedStrings.collect().foreach(println)
Output:
(A, "apple, avocado")
(B, "banana, blueberry")
Use Case: Useful in text processing, like grouping all user comments by user ID.
Example 4: Counting Words per Key
val words = sc.parallelize(Seq(("spark", 1), ("scala", 1), ("spark", 1), ("scala", 1), ("java", 1)))
val wordCounts = words.combineByKey(
(value) => value,
(count: Int, value) => count + value,
(count1: Int, count2: Int) => count1 + count2
)
wordCounts.collect().foreach(println)
Output:
(spark, 2)
(scala, 2)
(java, 1)
Use Case: Helps in word count problems for big data processing.
Example 5: Grouping Transactions Per Customer
val transactions = sc.parallelize(Seq(("C1", 100), ("C2", 200), ("C1", 300), ("C2", 400)))
val groupedTransactions = transactions.combineByKey(
(value) => List(value),
(acc: List[Int], value) => acc :+ value,
(acc1: List[Int], acc2: List[Int]) => acc1 ++ acc2
)
groupedTransactions.collect().foreach(println)
Output:
(C1, List(100, 300))
(C2, List(200, 400))
Use Case: Useful in financial applications where transactions per customer need to be grouped.
When to Use combineByKey
?
- When different transformations (sum, count, etc.) need to be applied at different stages.
- When handling large-scale key-value aggregations efficiently.
- When reducing memory usage compared to
groupByKey
.
When NOT to Use combineByKey
?
- When simple aggregations like
reduceByKey
are sufficient. - When data fits in memory, and a
groupByKey
operation is feasible.
combineByKey
is a flexible aggregation function in Spark, offering custom transformation capabilities per key. It is particularly useful when dealing with numeric calculations, list concatenation, or hierarchical data aggregation across partitions. 🚀