Creating a PySpark DataFrame from Files
Spark’s DataFrameReader (accessed via spark.read) supports most common file formats with extensive options for handling encoding, schema, and malformed data.
Reading CSV Files
from pyspark.sql import SparkSessionfrom pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, DateType
spark = SparkSession.builder.appName("FileReader").getOrCreate()
# Minimal — infer schema from content (slow for large files)df = spark.read.option("header", True).option("inferSchema", True).csv("employees.csv")
# Production — explicit schema (fast, predictable)schema = StructType([ StructField("id", IntegerType()), StructField("name", StringType()), StructField("department", StringType()), StructField("salary", DoubleType()), StructField("hire_date", DateType()),])
df = spark.read \ .schema(schema) \ .option("header", "true") \ .option("dateFormat", "yyyy-MM-dd") \ .option("nullValue", "N/A") \ .option("emptyValue", "") \ .option("mode", "DROPMALFORMED") \ # DROP, PERMISSIVE, or FAILFAST .option("encoding", "UTF-8") \ .csv("s3://bucket/employees/")Reading JSON Files
# Newline-delimited JSON (NDJSON) — one JSON object per line, defaultdf = spark.read.json("s3://bucket/events/*.json")
# Multi-line JSONdf = spark.read.option("multiLine", "true").json("config.json")
# With explicit schema (avoids type inference)from pyspark.sql.types import TimestampType, BooleanTypeschema = StructType([ StructField("event_id", StringType()), StructField("user_id", StringType()), StructField("event_type", StringType()), StructField("timestamp", TimestampType()), StructField("success", BooleanType()),])
df = spark.read.schema(schema).json("events.json")Reading Parquet Files
Parquet embeds its schema in the file — no inference needed:
# Simple readdf = spark.read.parquet("s3://bucket/transactions/")
# Multiple pathsdf = spark.read.parquet( "s3://bucket/2025/01/", "s3://bucket/2025/02/", "s3://bucket/2025/03/")
# Read with predicate pushdown (filters applied during read)df = spark.read \ .parquet("s3://bucket/sales/") \ .filter("year = 2025 AND region = 'APAC'")# Spark pushes the filter into the Parquet file scan — only reads matching row groupsReading Delta Lake Files
# Delta Lake — the recommended format for production lakehouses in 2025df = spark.read.format("delta").load("s3://bucket/delta/employees/")
# Read a specific snapshot (time travel)df = spark.read \ .format("delta") \ .option("versionAsOf", "5") \ .load("s3://bucket/delta/employees/")
df = spark.read \ .format("delta") \ .option("timestampAsOf", "2025-01-01") \ .load("s3://bucket/delta/employees/")Reading ORC and Avro
# ORC — optimized for Hive workloadsdf = spark.read.orc("hdfs://data/warehouse/")
# Avro — schema-embedded binary formatdf = spark.read.format("avro").load("s3://bucket/avro-data/")# Requires: spark-avro package (included in Spark 2.4+)Handling Multiple Files
# Wildcard — read all matching files as one DataFramedf = spark.read.parquet("s3://bucket/logs/2025-06-*.parquet")
# Recursive directory scandf = spark.read \ .option("recursiveFileLookup", "true") \ .parquet("s3://bucket/partitioned-data/")
# Add filename column (useful for debugging and auditing)df = spark.read \ .option("pathGlobFilter", "*.csv") \ .option("recursiveFileLookup", "true") \ .csv("s3://bucket/uploads/") \ .withColumn("source_file", F.input_file_name())