Spark Joins
Joins combine two DataFrames based on a matching condition. Spark supports all standard SQL join types plus broadcast join optimization for small tables.
All Join Types
from pyspark.sql import SparkSession, functions as F
spark = SparkSession.builder.appName("Joins Demo").getOrCreate()
employees = spark.createDataFrame([ (1, "Alice", 10), (2, "Bob", 20), (3, "Carol", 30), (4, "Dave", 99),], ["emp_id", "name", "dept_id"])
departments = spark.createDataFrame([ (10, "Engineering"), (20, "Marketing"), (30, "HR"),], ["dept_id", "dept_name"])Inner Join
Returns only rows where the join key exists in both DataFrames:
inner = employees.join(departments, "dept_id", "inner")inner.show()# Dave (dept_id=99) is excluded — no matching departmentLeft (Outer) Join
Returns all rows from the left DataFrame, with nulls for unmatched right rows:
left = employees.join(departments, "dept_id", "left")left.show()# Dave appears with null dept_nameRight (Outer) Join
Returns all rows from the right DataFrame, with nulls for unmatched left rows:
right = employees.join(departments, "dept_id", "right")right.show()Full Outer Join
Returns all rows from both DataFrames, with nulls where no match exists:
full = employees.join(departments, "dept_id", "full")full.show()Left Semi Join
Returns only rows from the left DataFrame where a match exists in the right — does not include right columns:
# Which employees have a valid department?semi = employees.join(departments, "dept_id", "left_semi")semi.show()# Columns from employees only (not dept_name)Left Anti Join
Returns only rows from the left DataFrame where no match exists in the right:
# Which employees have an invalid (missing) department?anti = employees.join(departments, "dept_id", "left_anti")anti.show()# Only Dave (dept_id=99) — has no matching departmentCross Join (Cartesian Product)
Combines every row of left with every row of right — M × N rows total:
spark.conf.set("spark.sql.crossJoin.enabled", "true")cross = employees.crossJoin(departments)print(cross.count()) # 4 × 3 = 12 rowsNon-Equi Joins (Complex Conditions)
products = spark.createDataFrame([ ("Laptop", 1200), ("Mouse", 25),], ["product", "price"])tiers = spark.createDataFrame([ ("Budget", 0, 100), ("Mid", 100, 500), ("Premium", 500, 10000),], ["tier", "min_price", "max_price"])
# Non-equi join: price falls within a rangeresult = products.join( tiers, (F.col("price") >= F.col("min_price")) & (F.col("price") < F.col("max_price")))result.show()Join Strategies and Performance
# Broadcast join — best for small tables (< 10 MB by default)from pyspark.sql.functions import broadcastfast_join = employees.join(broadcast(departments), "dept_id")
# Hint-based broadcast (when Spark's auto-detect misses it)employees.hint("broadcast").join(departments, "dept_id")
# Raise the auto-broadcast thresholdspark.conf.set("spark.sql.autoBroadcastJoinThreshold", "52428800") # 50 MB
# Sort-merge join (default for large-large joins)# Spark uses this automatically when broadcast is not applicable
# Enable AQE for runtime join strategy switchingspark.conf.set("spark.sql.adaptive.enabled", "true")Avoiding Duplicate Column Names
# When both DataFrames have a column with the same nameresult = df1.join(df2, df1.id == df2.id)# ← Now both DataFrames have an "id" column in the result
# Fix with aliasresult = df1.alias("a").join(df2.alias("b"), F.col("a.id") == F.col("b.id")) \ .select("a.id", "a.name", "b.dept_name")