AWS
- S3 vs. EBS vs. EFS
- AWS EC2
- AWS EMR
- AWS Glue
- AWS Glue Component
- AWS Glue: Interviews Questions and Answers
- AWS Lambda example
- AWS Lambda
- AWS Kinesis Features
- AWS Redshift : Questions and Answers
- Amazon Redshift
- AWS S3
- Step Functions
- Unlocking Efficiency and Flexibility with AWS Step Functions
- AWS Tagging for Cost Management, Resource Optimization, and Security
- AWS Control Tower vs AWS Organizations
- Choosing the Right Orchestration Tool for Your Workflow
- DynamoDB Global Table vs Regular DynamoDB Table
- AWS DynamoDB Streams
- AWS Kinesis
- CloudFront vs Global Accelerator
- AWS Glue: save Athena query data to DynamoDB
- AWS Glue(spark): save Athena query data to DynamoDB
- PySpark DataFrame to DynamoDB
Write PySpark DataFrame to DynamoDB Using Boto3
To write a PySpark DataFrame to an Amazon DynamoDB table, you can use the Boto3 library along with the batch_write_item API. Since DynamoDB does not natively support bulk writes from Spark, you need to convert the DataFrame into a format that can be inserted into DynamoDB efficiently.
Steps to Write PySpark DataFrame to DynamoDB
- Read/Create a PySpark DataFrame
- Convert DataFrame Rows to a Format Suitable for DynamoDB
- Use Boto3 to Write Data to DynamoDB in Batches
Example: Writing a PySpark DataFrame to DynamoDB
import boto3import jsonfrom pyspark.sql import SparkSession
# Initialize Spark sessionspark = SparkSession.builder.appName("DynamoDBWriteExample").getOrCreate()
# Sample DataFramedata = [ ("123", "John Doe", 30), ("456", "Jane Smith", 28), ("789", "Mike Johnson", 35)]
columns = ["id", "name", "age"]df = spark.createDataFrame(data, columns)
# Convert DataFrame to a list of dictionariesdynamodb_items = df.rdd.map(lambda row: { "PutRequest": { "Item": { "id": {"S": row["id"]}, "name": {"S": row["name"]}, "age": {"N": str(row["age"])} } }}).collect()
# Initialize DynamoDB clientdynamodb = boto3.client("dynamodb", region_name="us-east-1")
# Function to batch write to DynamoDBdef batch_write(table_name, items): table_items = {"RequestItems": {table_name: items}} response = dynamodb.batch_write_item(**table_items) return response
# Write data to DynamoDB in batches (25 items per batch)table_name = "YourDynamoDBTable"batch_size = 25for i in range(0, len(dynamodb_items), batch_size): batch = dynamodb_items[i:i+batch_size] response = batch_write(table_name, batch) print(f"Batch {i//batch_size + 1} written:", response)
print("Data successfully written to DynamoDB!")
Where to Use This?
- ETL Pipelines: Storing processed data in DynamoDB for low-latency access.
- Streaming Data Processing: Writing real-time data processed via PySpark.
- Data Aggregation: Storing aggregated metrics for API usage.
Key Considerations
- DynamoDB’s Write Capacity: Ensure proper provisioned capacity or use on-demand mode.
- Batch Write Limits: DynamoDB supports a max of 25 items per batch.
- Data Serialization: Convert numeric values to strings before inserting into DynamoDB.
This approach efficiently writes PySpark DataFrames to DynamoDB while handling batch writes and serialization. 🚀