Technology  /  Apache Spark

Apache Spark 49 guides · updated 2026

Distributed data processing at scale — RDDs, DataFrames, Structured Streaming, and the tuning techniques that keep Spark jobs fast and cheap.

Spark Broadcast Variables

A broadcast variable lets you distribute a read-only data object to all executor nodes efficiently — shipped once per executor rather than with every task. This is critical when tasks need access to a shared lookup table, ML model, or configuration object.


The Problem Without Broadcast

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Broadcast").getOrCreate()
sc = spark.sparkContext
# A 50 MB product lookup table on the driver
product_names = {f"P{i:04d}": f"Product {i}" for i in range(100000)}
rdd = sc.textFile("orders.txt") # Millions of orders
# BAD: Spark pickles and ships product_names with EVERY task
# If there are 500 tasks → 500 × 50 MB = 25 GB sent over the network
enriched = rdd.map(lambda order_id: product_names.get(order_id, "Unknown"))

Broadcasting the Lookup Table

# GOOD: ship once per executor — typically 10-50 executors
broadcast_names = sc.broadcast(product_names)
# Access via .value inside tasks
enriched = rdd.map(lambda order_id: broadcast_names.value.get(order_id, "Unknown"))
# Each executor receives the dict ONCE and reuses it across all its tasks

Broadcast Joins in DataFrame API

Broadcast variables eliminate shuffle in joins when one side is small:

from pyspark.sql import functions as F
from pyspark.sql.functions import broadcast
# Large fact table (billions of rows)
orders = spark.read.parquet("s3://bucket/orders/")
# Small dimension table (thousands of rows)
products = spark.read.parquet("s3://bucket/products/")
# BAD: sort-merge join — both tables shuffle
orders.join(products, "product_id")
# GOOD: products is broadcast — no shuffle on orders
orders.join(broadcast(products), "product_id")
# Auto-broadcast for tables smaller than threshold (default: 10 MB)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "52428800") # 50 MB

Broadcast in RDD API — Common Patterns

Pattern 1: Enrichment Lookup

country_lookup = {
"US": ("United States", "USD"),
"GB": ("United Kingdom", "GBP"),
"DE": ("Germany", "EUR"),
}
bc_lookup = sc.broadcast(country_lookup)
transactions = sc.parallelize([
("T001", "US", 100), ("T002", "DE", 250), ("T003", "GB", 75),
])
enriched = transactions.map(lambda t: (
t[0],
bc_lookup.value.get(t[1], ("Unknown", "N/A")),
t[2]
))
enriched.collect()

Pattern 2: Broadcast a Filter Set

# Blocked user IDs — filter without UDF overhead
blocked_users = {10234, 58921, 77432, 99001}
bc_blocked = sc.broadcast(blocked_users)
active_users = sc.parallelize(range(100000)) \
.filter(lambda user_id: user_id not in bc_blocked.value)

Pattern 3: Broadcast ML Model for Scoring

import pickle
# Load model on driver
with open("model.pkl", "rb") as f:
model = pickle.load(f)
bc_model = sc.broadcast(model)
features_rdd = spark.read.parquet("features.parquet").rdd
predictions = features_rdd.map(
lambda row: (row["user_id"], bc_model.value.predict([row["features"]])[0])
)

Lifecycle Management

# Unpersist: remove from executor memory; can be re-fetched if needed
broadcast_names.unpersist()
# Destroy: permanently remove from memory and BlockManager
broadcast_names.destroy()
# After destroy, .value raises RuntimeError

Size Guidelines

Data SizeApproach
< 10 MBAuto-broadcast via threshold
10 – 200 MBManual broadcast()
> 200 MBDon’t broadcast — use sort-merge join