Saving a PySpark DataFrame as Parquet
Parquet is the recommended format for Spark output in 2025. It’s columnar (enables predicate and projection pushdown), self-describing (schema is embedded), and compressed by default. Most lakehouse architectures build on Parquet or Parquet-based table formats like Delta Lake.
Basic Parquet Write
from pyspark.sql import SparkSession, functions as F
spark = SparkSession.builder.appName("WriteParquet").getOrCreate()
data = [("Alice", "Engineering", 95000), ("Bob", "Marketing", 72000)]df = spark.createDataFrame(data, ["name", "department", "salary"])
# Write Parquetdf.write.mode("overwrite").parquet("output/employees/")Write Modes
df.write.mode("overwrite").parquet("output/") # Replace all existing datadf.write.mode("append").parquet("output/") # Add to existing datadf.write.mode("ignore").parquet("output/") # No-op if path existsdf.write.mode("error").parquet("output/") # Default — error if path existsCompression Codecs
# Snappy — default, fast decompression, moderate compressiondf.write.option("compression", "snappy").parquet("output/")
# Gzip — better compression ratio, slower decompressiondf.write.option("compression", "gzip").parquet("output/")
# Zstandard — best balance of ratio and speed (recommended 2025)df.write.option("compression", "zstd").parquet("output/")
# No compression — for fastest sequential readsdf.write.option("compression", "none").parquet("output/")Partitioning
Partitioning by commonly-filtered columns dramatically improves query performance by enabling partition pruning (Spark skips irrelevant partitions):
# Partition by year and regiondf.write \ .mode("overwrite") \ .partitionBy("year", "region") \ .parquet("s3://bucket/sales/")
# Output structure:# s3://bucket/sales/year=2024/region=APAC/part-00000.snappy.parquet# s3://bucket/sales/year=2025/region=EMEA/part-00000.snappy.parquet
# Query with partition pruning — Spark only reads relevant directoriesspark.read \ .parquet("s3://bucket/sales/") \ .filter("year = 2025 AND region = 'APAC'") \ .show()Controlling Output File Size
# Repartition before writing for even output file sizesdf.repartition(50).write.mode("overwrite").parquet("output/")
# Single file output (for small datasets)df.coalesce(1).write.mode("overwrite").parquet("output/single/")
# Target file size with AQE (Spark 3.x automatic optimization)spark.conf.set("spark.sql.adaptive.enabled", "true")spark.conf.set("spark.sql.files.maxPartitionBytes", "134217728") # 128 MBDelta Lake (Recommended for Production)
Delta Lake builds on Parquet and adds ACID transactions, schema enforcement, and time travel:
# Write Delta tabledf.write.format("delta").mode("overwrite").save("s3://bucket/delta/employees/")
# Appenddf.write.format("delta").mode("append").save("s3://bucket/delta/employees/")
# Read backdelta_df = spark.read.format("delta").load("s3://bucket/delta/employees/")
# Time travel — read historical snapshotspark.read \ .format("delta") \ .option("versionAsOf", "3") \ .load("s3://bucket/delta/employees/") \ .show()
# Upsert (merge)from delta.tables import DeltaTabletarget = DeltaTable.forPath(spark, "s3://bucket/delta/employees/")target.alias("t").merge( df.alias("s"), "t.employee_id = s.employee_id").whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()Schema Evolution
# Allow adding new columns without rewriting the entire tabledf_with_new_col.write \ .format("delta") \ .option("mergeSchema", "true") \ .mode("append") \ .save("s3://bucket/delta/employees/")