Spark Lazy Evaluation
Lazy evaluation means Spark does not execute a transformation the moment you write it. Instead, each transformation is recorded as a step in a logical plan. Only when an action is called does Spark compile that plan into physical execution, optimize it, and run it. This is one of the most important design decisions in Spark — and the reason why Spark can be faster than hand-written MapReduce jobs.
How It Works: The DAG
from pyspark.sql import SparkSession, functions as F
spark = SparkSession.builder.appName("Lazy Demo").getOrCreate()sc = spark.sparkContext
# Every line below executes INSTANTLY — no data is read or processedrdd = sc.textFile("s3://bucket/weblogs/*.log") # Step 1 — registerederrors = rdd.filter(lambda l: "ERROR" in l) # Step 2 — registeredmessages = errors.map(lambda l: l.split(" ")[2:]) # Step 3 — registeredunique_messages = messages.distinct() # Step 4 — registered
# ← All four transformations form a DAG (Directed Acyclic Graph)# NOTHING has run yet.
# This action submits the DAG to the scheduler — NOW it runscount = unique_messages.count() # ← Triggers executionprint(f"Unique error types: {count}")Visualizing the DAG
You can inspect Spark’s DAG in the Spark UI (port 4040 while a job runs) or programmatically:
# Print the logical plan for a DataFramedf = spark.read.parquet("employees.parquet") \ .filter(F.col("salary") > 80000) \ .groupBy("department") \ .agg(F.avg("salary").alias("avg_salary"))
# Explain the physical plandf.explain()df.explain(mode="extended") # Shows logical + optimized + physical plansdf.explain(mode="formatted") # Human-readable tree formatSample output:
== Physical Plan ==AdaptiveSparkPlan isFinalPlan=false+- HashAggregate(keys=[department#12], functions=[avg(salary#14)]) +- Exchange hashpartitioning(department#12, 200), ENSURE_REQUIREMENTS +- HashAggregate(keys=[department#12], functions=[partial_avg(salary#14)]) +- Filter (salary#14 > 80000) +- FileScan parquet [department#12,salary#14] PushedFilters=[IsNotNull(salary), GreaterThan(salary,80000.0)]Notice: PushedFilters — Catalyst pushed the filter down to the Parquet file reader, so Spark only reads qualifying rows from disk.
What the Optimizer Does With Lazy Plans
Because Spark has the complete plan before executing, it can apply several optimizations automatically:
1. Predicate Pushdown
# You write this:df = spark.read.parquet("sales.parquet") \ .filter(F.col("year") == 2025) \ .select("product", "revenue")
# Spark executes this:# Read only 2025 data from Parquet (skip other row groups)# Read only 'product' and 'revenue' columns (column projection)# Never load filtered/excluded data into memory2. Operator Fusion (Narrow Transformations)
df = spark.read.parquet("logs.parquet") \ .filter(F.col("level") == "ERROR") \ .withColumn("msg_upper", F.upper(F.col("message"))) \ .select("timestamp", "msg_upper")All three narrow transformations above fuse into a single physical scan — one pass over the data instead of three.
3. Constant Folding
# You write:df.withColumn("tax_rate", F.lit(0.18) * F.lit(100))
# Optimizer computes: 0.18 * 100 = 18.0 at plan time# No multiplication happens at runtimeLazy Evaluation in Practice
# Common mistake — print doesn't trigger executiontransformed = df.filter(F.col("salary") > 80000)print(transformed) # Just shows the plan, doesn't run
# Only actions trigger executionprint(transformed.count()) # NOW it runstransformed.show() # Another job is submitted
# Cache to avoid re-reading data across multiple actionsdf.cache()df.filter(F.col("salary") > 80000).count() # Job 1 — reads + cachesdf.filter(F.col("dept") == "Eng").show() # Job 2 — reads from cachedf.unpersist()When Lazy Evaluation Doesn’t Help
# Schema inference forces an immediate readdf = spark.read.option("inferSchema", True).csv("data.csv")# ← Spark reads the file RIGHT NOW to figure out column types
# Solution: always provide an explicit schemafrom pyspark.sql.types import StructType, StructField, StringType, DoubleTypeschema = StructType([ StructField("product", StringType()), StructField("revenue", DoubleType()),])df = spark.read.schema(schema).csv("data.csv") # Truly lazy