Technology  /  Apache Spark

Apache Spark 49 guides · updated 2026

Distributed data processing at scale — RDDs, DataFrames, Structured Streaming, and the tuning techniques that keep Spark jobs fast and cheap.

Spark Accumulators

An accumulator is a shared variable that executors can only add to — never read. The driver program reads the final accumulated value after the job completes. Accumulators are designed for side-effect metrics: counting bad records, tracking processing events, or computing sums across distributed tasks without modifying the main data flow.


Basic Accumulator

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Accumulator Demo").getOrCreate()
sc = spark.sparkContext
error_count = sc.accumulator(0)
total_records = sc.accumulator(0)
def process_record(record):
total_records.add(1)
if "ERROR" in record:
error_count.add(1)
return record
rdd = sc.textFile("s3://bucket/logs/*.log")
rdd.map(process_record).count() # Action triggers the job
# Only readable on the driver AFTER the action completes
print(f"Total: {total_records.value}")
print(f"Errors: {error_count.value}")
print(f"Error rate: {error_count.value / total_records.value:.2%}")

Accumulator for Data Quality Monitoring

null_count = sc.accumulator(0)
invalid_age = sc.accumulator(0)
def validate_row(row):
if row["customer_id"] is None:
null_count.add(1)
if row["age"] is not None and (row["age"] < 0 or row["age"] > 130):
invalid_age.add(1)
return row
df = spark.read.parquet("customers.parquet")
df.rdd.map(validate_row).count()
print(f"Null customer IDs: {null_count.value}")
print(f"Invalid ages: {invalid_age.value}")

Custom Accumulator Types

from pyspark import AccumulatorParam
class SetAccumulator(AccumulatorParam):
"""Accumulator that collects unique values."""
def zero(self, initial_value):
return set()
def addInPlace(self, v1, v2):
v1.update(v2 if isinstance(v2, set) else {v2})
return v1
unique_errors = sc.accumulator(set(), SetAccumulator())
def collect_error_types(line):
if "ERROR" in line:
error_type = line.split("[ERROR]")[1].strip().split(":")[0]
unique_errors.add({error_type})
return line
sc.textFile("logs.txt").foreach(collect_error_types)
print(f"Unique error types: {unique_errors.value}")

Important Limitations

# 1. Accumulators in transformations may double-count on task retry
# Spark retries failed tasks — each retry increments the accumulator again
# BAD: transformation — may be retried
mapped = rdd.map(lambda x: (counter.add(1), x)[1])
# GOOD: foreach (action-level, each element processed exactly once per attempt)
rdd.foreach(lambda x: counter.add(1))
# 2. Accumulators are write-only in executors
def bad_function(x):
print(counter.value) # Always 0 inside executors!
counter.add(1)
# 3. Only valid after an action triggers the job

DataFrame Alternative (Preferred)

For most data quality checks, built-in DataFrame operations are simpler and more reliable:

from pyspark.sql import functions as F
df = spark.read.parquet("customers.parquet")
quality = df.agg(
F.count("*").alias("total"),
F.sum(F.when(F.col("customer_id").isNull(), 1).otherwise(0)).alias("null_ids"),
F.sum(F.when(F.col("age") < 0, 1).otherwise(0)).alias("invalid_ages"),
F.countDistinct("customer_id").alias("unique_customers"),
)
quality.show()

Use accumulators when you need metrics from inside a transformation that you don’t want to affect the output schema, or for side-channel monitoring in Spark Streaming jobs.