Spark Accumulators
An accumulator is a shared variable that executors can only add to — never read. The driver program reads the final accumulated value after the job completes. Accumulators are designed for side-effect metrics: counting bad records, tracking processing events, or computing sums across distributed tasks without modifying the main data flow.
Basic Accumulator
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Accumulator Demo").getOrCreate()sc = spark.sparkContext
error_count = sc.accumulator(0)total_records = sc.accumulator(0)
def process_record(record): total_records.add(1) if "ERROR" in record: error_count.add(1) return record
rdd = sc.textFile("s3://bucket/logs/*.log")rdd.map(process_record).count() # Action triggers the job
# Only readable on the driver AFTER the action completesprint(f"Total: {total_records.value}")print(f"Errors: {error_count.value}")print(f"Error rate: {error_count.value / total_records.value:.2%}")Accumulator for Data Quality Monitoring
null_count = sc.accumulator(0)invalid_age = sc.accumulator(0)
def validate_row(row): if row["customer_id"] is None: null_count.add(1) if row["age"] is not None and (row["age"] < 0 or row["age"] > 130): invalid_age.add(1) return row
df = spark.read.parquet("customers.parquet")df.rdd.map(validate_row).count()
print(f"Null customer IDs: {null_count.value}")print(f"Invalid ages: {invalid_age.value}")Custom Accumulator Types
from pyspark import AccumulatorParam
class SetAccumulator(AccumulatorParam): """Accumulator that collects unique values."""
def zero(self, initial_value): return set()
def addInPlace(self, v1, v2): v1.update(v2 if isinstance(v2, set) else {v2}) return v1
unique_errors = sc.accumulator(set(), SetAccumulator())
def collect_error_types(line): if "ERROR" in line: error_type = line.split("[ERROR]")[1].strip().split(":")[0] unique_errors.add({error_type}) return line
sc.textFile("logs.txt").foreach(collect_error_types)print(f"Unique error types: {unique_errors.value}")Important Limitations
# 1. Accumulators in transformations may double-count on task retry# Spark retries failed tasks — each retry increments the accumulator again
# BAD: transformation — may be retriedmapped = rdd.map(lambda x: (counter.add(1), x)[1])
# GOOD: foreach (action-level, each element processed exactly once per attempt)rdd.foreach(lambda x: counter.add(1))
# 2. Accumulators are write-only in executorsdef bad_function(x): print(counter.value) # Always 0 inside executors! counter.add(1)
# 3. Only valid after an action triggers the jobDataFrame Alternative (Preferred)
For most data quality checks, built-in DataFrame operations are simpler and more reliable:
from pyspark.sql import functions as F
df = spark.read.parquet("customers.parquet")
quality = df.agg( F.count("*").alias("total"), F.sum(F.when(F.col("customer_id").isNull(), 1).otherwise(0)).alias("null_ids"), F.sum(F.when(F.col("age") < 0, 1).otherwise(0)).alias("invalid_ages"), F.countDistinct("customer_id").alias("unique_customers"),)quality.show()Use accumulators when you need metrics from inside a transformation that you don’t want to affect the output schema, or for side-channel monitoring in Spark Streaming jobs.