Apache Spark DataFrames
A DataFrame is Spark’s primary abstraction for structured data — a distributed table with named columns and a schema. Unlike RDDs, DataFrames are optimized by the Catalyst query optimizer, which generates efficient execution plans regardless of whether you use Python, Scala, or SQL.
Creating DataFrames
from pyspark.sql import SparkSessionfrom pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
spark = SparkSession.builder.appName("DataFrame Demo").getOrCreate()
# From a Python listdata = [ ("Alice", "Engineering", 95000), ("Bob", "Marketing", 72000), ("Carol", "Engineering", 110000), ("Dave", "HR", 65000),]df = spark.createDataFrame(data, ["name", "department", "salary"])
# With explicit schemaschema = StructType([ StructField("name", StringType(), nullable=False), StructField("department", StringType(), nullable=True), StructField("salary", IntegerType(), nullable=True),])df = spark.createDataFrame(data, schema)
# From filesdf_csv = spark.read.option("header", True).option("inferSchema", True).csv("data/employees.csv")df_json = spark.read.json("data/events/*.json")df_parquet = spark.read.parquet("s3://bucket/transactions/")df_delta = spark.read.format("delta").load("s3://bucket/delta-table/")Schema and Inspection
df.printSchema()# root# |-- name: string (nullable = false)# |-- department: string (nullable = true)# |-- salary: integer (nullable = true)
df.show(5)df.show(5, truncate=False)
df.dtypes # [('name', 'string'), ('department', 'string'), ('salary', 'int')]df.columns # ['name', 'department', 'salary']df.count() # 4df.describe().show() # Summary statsTransformations
from pyspark.sql import functions as F
# Select columnsdf.select("name", "salary")df.select(F.col("name"), F.col("salary") * 1.1)
# Filter rowsdf.filter(df.salary > 80000)df.filter(F.col("department") == "Engineering")df.where("salary BETWEEN 70000 AND 120000")
# Add / rename columnsdf.withColumn("salary_bonus", F.col("salary") * 0.1)df.withColumnRenamed("salary", "annual_salary")
# Drop columnsdf.drop("department")
# Sortdf.orderBy(F.col("salary").desc())df.sort("department", F.col("salary").desc())
# Limitdf.limit(10)Aggregations and Grouping
# Group by + aggregatedept_stats = df.groupBy("department").agg( F.count("name").alias("headcount"), F.avg("salary").alias("avg_salary"), F.max("salary").alias("max_salary"), F.sum("salary").alias("total_salary"))dept_stats.show()# +------------+---------+----------+----------+------------+# |department |headcount|avg_salary|max_salary|total_salary|# +------------+---------+----------+----------+------------+# |Engineering |2 |102500.0 |110000 |205000 |# |Marketing |1 |72000.0 |72000 |72000 |# |HR |1 |65000.0 |65000 |65000 |# +------------+---------+----------+----------+------------+
# Window functionsfrom pyspark.sql.window import Window
window = Window.partitionBy("department").orderBy(F.col("salary").desc())df.withColumn("rank_in_dept", F.rank().over(window)).show()SQL Queries
# Register as temporary viewdf.createOrReplaceTempView("employees")
# Run SQLresult = spark.sql(""" SELECT department, COUNT(*) as headcount, ROUND(AVG(salary), 0) as avg_salary FROM employees WHERE salary > 60000 GROUP BY department ORDER BY avg_salary DESC""")result.show()Joins
departments = spark.createDataFrame([ ("Engineering", "Building A"), ("Marketing", "Building B"), ("HR", "Building C"),], ["dept_name", "location"])
# Inner joindf.join(departments, df.department == departments.dept_name, "inner")
# Left joindf.join(departments, df.department == departments.dept_name, "left")
# Broadcast join (small table — avoids shuffle)from pyspark.sql.functions import broadcastdf.join(broadcast(departments), df.department == departments.dept_name)Writing Data
# Write to various formatsdf.write.mode("overwrite").parquet("s3://bucket/output/")df.write.mode("append").json("hdfs://output/json/")df.write.option("header", True).csv("output/employees.csv")
# Partition by column (improves read performance)df.write.partitionBy("department").mode("overwrite").parquet("s3://bucket/partitioned/")
# Delta Lake (2025 standard for lakehouse)df.write.format("delta").mode("overwrite").save("s3://bucket/delta/employees/")Performance Tips
# Cache a frequently reused DataFramedf.cache()df.persist() # Explicit storage level
# Unpersist when donedf.unpersist()
# Repartition for parallelismdf.repartition(200) # Shuffle — even distributiondf.coalesce(4) # No shuffle — reduce partitions
# Avoid UDFs when built-in functions exist# BAD: df.withColumn("upper_name", F.udf(lambda x: x.upper())(F.col("name")))# GOOD:df.withColumn("upper_name", F.upper(F.col("name")))