Technology  /  Apache Spark

Apache Spark 49 guides · updated 2026

Distributed data processing at scale — RDDs, DataFrames, Structured Streaming, and the tuning techniques that keep Spark jobs fast and cheap.

Spark Lazy Evaluation

Lazy evaluation means Spark does not execute a transformation the moment you write it. Instead, each transformation is recorded as a step in a logical plan. Only when an action is called does Spark compile that plan into physical execution, optimize it, and run it. This is one of the most important design decisions in Spark — and the reason why Spark can be faster than hand-written MapReduce jobs.


How It Works: The DAG

from pyspark.sql import SparkSession, functions as F
spark = SparkSession.builder.appName("Lazy Demo").getOrCreate()
sc = spark.sparkContext
# Every line below executes INSTANTLY — no data is read or processed
rdd = sc.textFile("s3://bucket/weblogs/*.log") # Step 1 — registered
errors = rdd.filter(lambda l: "ERROR" in l) # Step 2 — registered
messages = errors.map(lambda l: l.split(" ")[2:]) # Step 3 — registered
unique_messages = messages.distinct() # Step 4 — registered
# ← All four transformations form a DAG (Directed Acyclic Graph)
# NOTHING has run yet.
# This action submits the DAG to the scheduler — NOW it runs
count = unique_messages.count() # ← Triggers execution
print(f"Unique error types: {count}")

Visualizing the DAG

You can inspect Spark’s DAG in the Spark UI (port 4040 while a job runs) or programmatically:

# Print the logical plan for a DataFrame
df = spark.read.parquet("employees.parquet") \
.filter(F.col("salary") > 80000) \
.groupBy("department") \
.agg(F.avg("salary").alias("avg_salary"))
# Explain the physical plan
df.explain()
df.explain(mode="extended") # Shows logical + optimized + physical plans
df.explain(mode="formatted") # Human-readable tree format

Sample output:

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[department#12], functions=[avg(salary#14)])
+- Exchange hashpartitioning(department#12, 200), ENSURE_REQUIREMENTS
+- HashAggregate(keys=[department#12], functions=[partial_avg(salary#14)])
+- Filter (salary#14 > 80000)
+- FileScan parquet [department#12,salary#14] PushedFilters=[IsNotNull(salary), GreaterThan(salary,80000.0)]

Notice: PushedFilters — Catalyst pushed the filter down to the Parquet file reader, so Spark only reads qualifying rows from disk.


What the Optimizer Does With Lazy Plans

Because Spark has the complete plan before executing, it can apply several optimizations automatically:

1. Predicate Pushdown

# You write this:
df = spark.read.parquet("sales.parquet") \
.filter(F.col("year") == 2025) \
.select("product", "revenue")
# Spark executes this:
# Read only 2025 data from Parquet (skip other row groups)
# Read only 'product' and 'revenue' columns (column projection)
# Never load filtered/excluded data into memory

2. Operator Fusion (Narrow Transformations)

df = spark.read.parquet("logs.parquet") \
.filter(F.col("level") == "ERROR") \
.withColumn("msg_upper", F.upper(F.col("message"))) \
.select("timestamp", "msg_upper")

All three narrow transformations above fuse into a single physical scan — one pass over the data instead of three.

3. Constant Folding

# You write:
df.withColumn("tax_rate", F.lit(0.18) * F.lit(100))
# Optimizer computes: 0.18 * 100 = 18.0 at plan time
# No multiplication happens at runtime

Lazy Evaluation in Practice

# Common mistake — print doesn't trigger execution
transformed = df.filter(F.col("salary") > 80000)
print(transformed) # Just shows the plan, doesn't run
# Only actions trigger execution
print(transformed.count()) # NOW it runs
transformed.show() # Another job is submitted
# Cache to avoid re-reading data across multiple actions
df.cache()
df.filter(F.col("salary") > 80000).count() # Job 1 — reads + caches
df.filter(F.col("dept") == "Eng").show() # Job 2 — reads from cache
df.unpersist()

When Lazy Evaluation Doesn’t Help

# Schema inference forces an immediate read
df = spark.read.option("inferSchema", True).csv("data.csv")
# ← Spark reads the file RIGHT NOW to figure out column types
# Solution: always provide an explicit schema
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
schema = StructType([
StructField("product", StringType()),
StructField("revenue", DoubleType()),
])
df = spark.read.schema(schema).csv("data.csv") # Truly lazy