Core Apache Spark Concepts
- Resilient Distributed Dataset (RDD)
- DataFrames
- Datasets
- Transformations
- Actions
- Lazy Evaluation
- SparkSession
- SparkContext
- Partitions
- Shuffling
- Persistence & Caching
- Lineage Graphs
- Jobs
- Stages
- Tasks
Apache Spark
- Apache Spark: Big Data Processing & Analytics
- Spark DataFrames: Features, Use Cases & Optimization for Big Data
- Spark Architecture
- Dataframe create from file
- Dataframe Pyspark create from collections
- Spark Dataframe save as csv
- Dataframe save as parquet
- Dataframe show() between take() methods
- Apache SparkSession
- Understanding the RDD of Apache Spark
- Spark RDD creation from collection
- Different method to print data from rdd
- Practical use of unionByName method
- Creating Spark DataFrames: Methods & Examples
- Setup Spark in PyCharm
- Apache Spark all APIs
- Spark for the word count program
- Spark Accumulators
- aggregateByKey in Apache Spark
- Spark Broadcast with Examples
- Spark combineByKey
- Apache Spark Using countByKey
- Spark CrossJoin know all
- Optimizing Spark groupByKey: Usage, Best Practices, and Examples
- Mastering Spark Joins: Inner, Outer, Left, Right & Semi Joins Explained
- Apache Spark: Local Mode vs Cluster Mode - Key Differences & Examples
- Spark map vs flatMap: Key Differences with Examples
- Efficient Data Processing with Spark mapPartitionsWithIndex
- Spark reduceByKey with 5 Real-World Examples
- Spark Union vs UnionAll vs Union Available – Key Differences & Examples
** Spark Accumulators with Examples**
What is a Spark Accumulator?
In Apache Spark, an Accumulator is a shared variable used to aggregate values across tasks in a distributed manner. It is primarily used for counting, summing, and debugging purposes. Accumulators are write-only variables from the worker nodes but readable only by the driver.
When to Use Spark Accumulators?
- Logging and Debugging: Tracking values across worker nodes without affecting the main computation.
- Counting Events: Keeping track of errors, missing values, or processed records.
- Summing Values: Accumulating metrics like total sales, total visitors, etc.
- Monitoring Execution: Checking how many tasks have been executed.
- Performance Metrics: Gathering statistics like failed records, null values, etc.
Example 1: Counting Number of Processed Records
This example counts the number of processed records in an RDD using an accumulator.
import org.apache.spark.{SparkConf, SparkContext}
val conf = new SparkConf().setAppName("AccumulatorExample").setMaster("local")val sc = new SparkContext(conf)
// Define an accumulatorval recordCounter = sc.longAccumulator("Record Counter")
// Sample dataval data = sc.parallelize(Seq("A", "B", "C", "D", "E"))
// Processing datadata.foreach(record => recordCounter.add(1))
println(s"Total records processed: ${recordCounter.value}")
sc.stop()Use Case: Helps in monitoring the number of records processed without modifying the original dataset.
Example 2: Summing Values Using Accumulator
This example calculates the total sum of elements in an RDD using an accumulator.
val sumAccumulator = sc.longAccumulator("Sum Accumulator")
val numbers = sc.parallelize(Seq(10, 20, 30, 40, 50))
numbers.foreach(num => sumAccumulator.add(num))
println(s"Total Sum: ${sumAccumulator.value}")Use Case: Useful in ETL jobs where we need to calculate aggregate metrics like total revenue or sales.
Example 3: Counting Missing Values in a Dataset
This example counts missing values (null or empty) in an RDD.
val missingValuesAccumulator = sc.longAccumulator("Missing Values")
val dataset = sc.parallelize(Seq("John", "", "Alice", "Bob", "", "Eve"))
dataset.foreach(value => if (value.isEmpty) missingValuesAccumulator.add(1))
println(s"Total missing values: ${missingValuesAccumulator.value}")Use Case: Helps in data quality monitoring, identifying missing or incorrect records.
Example 4: Monitoring Error Occurrences
This example tracks the number of failed records in a dataset.
val errorAccumulator = sc.longAccumulator("Error Counter")
val records = sc.parallelize(Seq("10", "20", "invalid", "30", "error", "40"))
records.foreach(record => {  try {    val num = record.toInt  } catch {    case _: NumberFormatException => errorAccumulator.add(1)  }})
println(s"Total errors encountered: ${errorAccumulator.value}")Use Case: Useful in big data processing to monitor and count erroneous records.
Example 5: Tracking Processed Records in a DataFrame (Using Spark SQL)
This example tracks how many rows are processed in a DataFrame.
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.appName("AccumulatorExample").master("local").getOrCreate()val sc = spark.sparkContext
val processedRecords = sc.longAccumulator("Processed Records")
import spark.implicits._val df = Seq(("Alice", 25), ("Bob", 30), ("Eve", 35)).toDF("Name", "Age")
df.foreach(row => processedRecords.add(1))
println(s"Total rows processed: ${processedRecords.value}")Use Case: Helps in tracking the number of records processed in structured data processing.
Where NOT to Use Spark Accumulators?
- Replacing Reduce Operations: Accumulators are not meant for replacing reduce()oraggregate(), as they do not modify the RDD.
- Returning Values from Tasks: Workers can only write to accumulators; they cannot read values from them.
- Using in Transformations: Accumulators are not guaranteed to work correctly in transformations (map,filter) since they may be recomputed on failure.
Spark Accumulators are powerful for debugging, tracking metrics, and error monitoring in distributed applications. They should be used only for side-effect operations like logging, counting, and aggregating non-essential values.