Spark Broadcast Variables
A broadcast variable lets you distribute a read-only data object to all executor nodes efficiently — shipped once per executor rather than with every task. This is critical when tasks need access to a shared lookup table, ML model, or configuration object.
The Problem Without Broadcast
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Broadcast").getOrCreate()sc = spark.sparkContext
# A 50 MB product lookup table on the driverproduct_names = {f"P{i:04d}": f"Product {i}" for i in range(100000)}
rdd = sc.textFile("orders.txt") # Millions of orders
# BAD: Spark pickles and ships product_names with EVERY task# If there are 500 tasks → 500 × 50 MB = 25 GB sent over the networkenriched = rdd.map(lambda order_id: product_names.get(order_id, "Unknown"))Broadcasting the Lookup Table
# GOOD: ship once per executor — typically 10-50 executorsbroadcast_names = sc.broadcast(product_names)
# Access via .value inside tasksenriched = rdd.map(lambda order_id: broadcast_names.value.get(order_id, "Unknown"))
# Each executor receives the dict ONCE and reuses it across all its tasksBroadcast Joins in DataFrame API
Broadcast variables eliminate shuffle in joins when one side is small:
from pyspark.sql import functions as Ffrom pyspark.sql.functions import broadcast
# Large fact table (billions of rows)orders = spark.read.parquet("s3://bucket/orders/")
# Small dimension table (thousands of rows)products = spark.read.parquet("s3://bucket/products/")
# BAD: sort-merge join — both tables shuffleorders.join(products, "product_id")
# GOOD: products is broadcast — no shuffle on ordersorders.join(broadcast(products), "product_id")
# Auto-broadcast for tables smaller than threshold (default: 10 MB)spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "52428800") # 50 MBBroadcast in RDD API — Common Patterns
Pattern 1: Enrichment Lookup
country_lookup = { "US": ("United States", "USD"), "GB": ("United Kingdom", "GBP"), "DE": ("Germany", "EUR"),}bc_lookup = sc.broadcast(country_lookup)
transactions = sc.parallelize([ ("T001", "US", 100), ("T002", "DE", 250), ("T003", "GB", 75),])
enriched = transactions.map(lambda t: ( t[0], bc_lookup.value.get(t[1], ("Unknown", "N/A")), t[2]))enriched.collect()Pattern 2: Broadcast a Filter Set
# Blocked user IDs — filter without UDF overheadblocked_users = {10234, 58921, 77432, 99001}bc_blocked = sc.broadcast(blocked_users)
active_users = sc.parallelize(range(100000)) \ .filter(lambda user_id: user_id not in bc_blocked.value)Pattern 3: Broadcast ML Model for Scoring
import pickle
# Load model on driverwith open("model.pkl", "rb") as f: model = pickle.load(f)
bc_model = sc.broadcast(model)
features_rdd = spark.read.parquet("features.parquet").rdd
predictions = features_rdd.map( lambda row: (row["user_id"], bc_model.value.predict([row["features"]])[0]))Lifecycle Management
# Unpersist: remove from executor memory; can be re-fetched if neededbroadcast_names.unpersist()
# Destroy: permanently remove from memory and BlockManagerbroadcast_names.destroy()
# After destroy, .value raises RuntimeErrorSize Guidelines
| Data Size | Approach |
|---|---|
| < 10 MB | Auto-broadcast via threshold |
| 10 – 200 MB | Manual broadcast() |
| > 200 MB | Don’t broadcast — use sort-merge join |