Technology  /  Apache Spark

Apache Spark 49 guides · updated 2026

Distributed data processing at scale — RDDs, DataFrames, Structured Streaming, and the tuning techniques that keep Spark jobs fast and cheap.

Spark Actions

Actions are operations that trigger the actual execution of a Spark computation. Before an action is called, every transformation sits idle in a DAG — a blueprint, not running code. The moment you call an action, Spark submits a job, divides it into stages, and executes tasks across the cluster.


Why the Distinction Matters

# This runs instantly — just builds the DAG
rdd = sc.parallelize(range(1, 1_000_000))
filtered = rdd.filter(lambda x: x % 2 == 0)
doubled = filtered.map(lambda x: x * 2)
# This triggers the whole chain — real computation happens here
total = doubled.sum() # ← ACTION

Understanding the transformation/action boundary is essential for:


Common RDD Actions

from pyspark.sql import SparkSession
sc = SparkSession.builder.appName("Actions").getOrCreate().sparkContext
rdd = sc.parallelize([10, 20, 30, 40, 50])
# collect — bring all data to the driver (⚠️ use only for small datasets)
rdd.collect() # [10, 20, 30, 40, 50]
# count — number of elements
rdd.count() # 5
# first — first element
rdd.first() # 10
# take(n) — first n elements (no sorting guarantee)
rdd.take(3) # [10, 20, 30]
# takeSample — random sample without replacement
rdd.takeSample(False, 3, seed=42) # e.g., [30, 10, 50]
# top(n) — top n elements (uses natural ordering)
rdd.top(3) # [50, 40, 30]
# reduce — aggregate all elements with a binary function
rdd.reduce(lambda a, b: a + b) # 150
# fold — like reduce but with a zero value
rdd.fold(0, lambda a, b: a + b) # 150
# aggregate — different zero values for partition and combine phases
rdd.aggregate(
(0, 0), # zero: (sum, count)
lambda acc, x: (acc[0] + x, acc[1] + 1), # partition combine
lambda a, b: (a[0] + b[0], a[1] + b[1]) # cross-partition combine
)
# (150, 5) → avg = 150 / 5 = 30.0
# foreach — runs a function on each element (side effects, no return value)
rdd.foreach(lambda x: print(f"Value: {x}")) # Runs on executors, not driver
# foreachPartition — one function call per partition (good for DB connections)
def write_to_db(partition):
conn = connect_to_db()
for record in partition:
conn.insert(record)
conn.close()
rdd.foreachPartition(write_to_db)

Common DataFrame Actions

from pyspark.sql import functions as F
df = spark.read.parquet("employees.parquet")
# show — print rows to stdout (driver only)
df.show()
df.show(20, truncate=False) # Don't truncate long strings
df.show(5, vertical=True) # One column per line
# count
df.count() # Triggers a full scan
# collect — returns list of Row objects
rows = df.collect()
for row in rows:
print(row["name"], row["salary"])
# take(n) / head(n)
df.take(5) # List of 5 Row objects
df.head(5) # Same as take(5)
# first
df.first() # First Row object
# toPandas — converts entire DataFrame to pandas (⚠️ must fit in driver memory)
import pandas as pd
pdf: pd.DataFrame = df.toPandas()
# Summary statistics
df.describe("salary").show()
df.summary().show() # Extended stats including percentiles

Save Actions

# RDD saves
rdd.saveAsTextFile("hdfs://output/data/")
rdd.saveAsPickleFile("hdfs://output/pickled/")
# DataFrame saves
df.write.mode("overwrite").parquet("s3://bucket/employees/")
df.write.mode("append").format("delta").save("s3://bucket/delta/employees/")
df.write.option("header", True).csv("output/report.csv")
# Write with partitioning
df.write \
.partitionBy("year", "month") \
.mode("overwrite") \
.parquet("s3://bucket/partitioned/")

Pitfalls to Avoid

# BAD: collect() on a large dataset — crashes the driver
big_df.collect() # OutOfMemoryError if > driver memory
# GOOD: use take() or write to storage instead
big_df.take(1000)
big_df.write.parquet("output/")
# BAD: count() in a loop triggers a full scan each time
for dept in ["Eng", "Mkt", "HR"]:
n = df.filter(df.dept == dept).count() # 3 separate jobs
# GOOD: aggregate in one pass
df.groupBy("dept").count().collect() # 1 job
# BAD: multiple actions on an uncached DataFrame re-reads data each time
result1 = df.filter(...).count() # Read data once
result2 = df.filter(...).show() # Read data again
# GOOD: cache before multiple actions
df.cache()
result1 = df.filter(...).count()
result2 = df.filter(...).show()
df.unpersist()