AWS
- S3 vs. EBS vs. EFS
- AWS EC2
- AWS EMR
- AWS Glue
- AWS Glue Component
- AWS Glue: Interviews Questions and Answers
- AWS Lambda example
- AWS Lambda
- AWS Kinesis Features
- AWS Redshift : Questions and Answers
- Amazon Redshift
- AWS S3
- Step Functions
- Unlocking Efficiency and Flexibility with AWS Step Functions
- AWS Tagging for Cost Management, Resource Optimization, and Security
- AWS Control Tower vs AWS Organizations
- Choosing the Right Orchestration Tool for Your Workflow
- DynamoDB Global Table vs Regular DynamoDB Table
- AWS DynamoDB Streams
- AWS Kinesis
- CloudFront vs Global Accelerator
- AWS Glue: save Athena query data to DynamoDB
- AWS Glue(spark): save Athena query data to DynamoDB
- PySpark DataFrame to DynamoDB
Write PySpark DataFrame to DynamoDB Using Boto3
To write a PySpark DataFrame to an Amazon DynamoDB table, you can use the Boto3 library along with the batch_write_item API. Since DynamoDB does not natively support bulk writes from Spark, you need to convert the DataFrame into a format that can be inserted into DynamoDB efficiently.
Steps to Write PySpark DataFrame to DynamoDB
- Read/Create a PySpark DataFrame
- Convert DataFrame Rows to a Format Suitable for DynamoDB
- Use Boto3 to Write Data to DynamoDB in Batches
Example: Writing a PySpark DataFrame to DynamoDB
import boto3
import json
from pyspark.sql import SparkSession
# Initialize Spark session
spark = SparkSession.builder.appName("DynamoDBWriteExample").getOrCreate()
# Sample DataFrame
data = [
("123", "John Doe", 30),
("456", "Jane Smith", 28),
("789", "Mike Johnson", 35)
]
columns = ["id", "name", "age"]
df = spark.createDataFrame(data, columns)
# Convert DataFrame to a list of dictionaries
dynamodb_items = df.rdd.map(lambda row: {
"PutRequest": {
"Item": {
"id": {"S": row["id"]},
"name": {"S": row["name"]},
"age": {"N": str(row["age"])}
}
}
}).collect()
# Initialize DynamoDB client
dynamodb = boto3.client("dynamodb", region_name="us-east-1")
# Function to batch write to DynamoDB
def batch_write(table_name, items):
table_items = {"RequestItems": {table_name: items}}
response = dynamodb.batch_write_item(**table_items)
return response
# Write data to DynamoDB in batches (25 items per batch)
table_name = "YourDynamoDBTable"
batch_size = 25
for i in range(0, len(dynamodb_items), batch_size):
batch = dynamodb_items[i:i+batch_size]
response = batch_write(table_name, batch)
print(f"Batch {i//batch_size + 1} written:", response)
print("Data successfully written to DynamoDB!")
Where to Use This?
- ETL Pipelines: Storing processed data in DynamoDB for low-latency access.
- Streaming Data Processing: Writing real-time data processed via PySpark.
- Data Aggregation: Storing aggregated metrics for API usage.
Key Considerations
- DynamoDB’s Write Capacity: Ensure proper provisioned capacity or use on-demand mode.
- Batch Write Limits: DynamoDB supports a max of 25 items per batch.
- Data Serialization: Convert numeric values to strings before inserting into DynamoDB.
This approach efficiently writes PySpark DataFrames to DynamoDB while handling batch writes and serialization. 🚀