Write PySpark DataFrame to DynamoDB Using Boto3

To write a PySpark DataFrame to an Amazon DynamoDB table, you can use the Boto3 library along with the batch_write_item API. Since DynamoDB does not natively support bulk writes from Spark, you need to convert the DataFrame into a format that can be inserted into DynamoDB efficiently.


Steps to Write PySpark DataFrame to DynamoDB

  1. Read/Create a PySpark DataFrame
  2. Convert DataFrame Rows to a Format Suitable for DynamoDB
  3. Use Boto3 to Write Data to DynamoDB in Batches

Example: Writing a PySpark DataFrame to DynamoDB

import boto3
import json
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.appName("DynamoDBWriteExample").getOrCreate()

# Sample DataFrame
data = [
    ("123", "John Doe", 30),
    ("456", "Jane Smith", 28),
    ("789", "Mike Johnson", 35)
]

columns = ["id", "name", "age"]
df = spark.createDataFrame(data, columns)

# Convert DataFrame to a list of dictionaries
dynamodb_items = df.rdd.map(lambda row: {
    "PutRequest": {
        "Item": {
            "id": {"S": row["id"]},
            "name": {"S": row["name"]},
            "age": {"N": str(row["age"])}
        }
    }
}).collect()

# Initialize DynamoDB client
dynamodb = boto3.client("dynamodb", region_name="us-east-1")

# Function to batch write to DynamoDB
def batch_write(table_name, items):
    table_items = {"RequestItems": {table_name: items}}
    response = dynamodb.batch_write_item(**table_items)
    return response

# Write data to DynamoDB in batches (25 items per batch)
table_name = "YourDynamoDBTable"
batch_size = 25
for i in range(0, len(dynamodb_items), batch_size):
    batch = dynamodb_items[i:i+batch_size]
    response = batch_write(table_name, batch)
    print(f"Batch {i//batch_size + 1} written:", response)

print("Data successfully written to DynamoDB!")

Where to Use This?

  • ETL Pipelines: Storing processed data in DynamoDB for low-latency access.
  • Streaming Data Processing: Writing real-time data processed via PySpark.
  • Data Aggregation: Storing aggregated metrics for API usage.

Key Considerations

  • DynamoDB’s Write Capacity: Ensure proper provisioned capacity or use on-demand mode.
  • Batch Write Limits: DynamoDB supports a max of 25 items per batch.
  • Data Serialization: Convert numeric values to strings before inserting into DynamoDB.

This approach efficiently writes PySpark DataFrames to DynamoDB while handling batch writes and serialization. 🚀