Amazon Web Services
Compute
- AWS EC2
- EC2 Instance Types
- EC2 Pricing Models
- EC2 Auto Scaling
- Elastic Load Balancing-ELB
- AWS Lambda – Serverless Computing
- Amazon Lightsail
- AWS Elastic Beanstalk
- AWS Fargate
- Amazon ECS (Elastic Container Service)
- Amazon EKS (Elastic Kubernetes Service)
DynamoDB
- DynamoDB Global Table vs Regular DynamoDB Table
- DynamoDB Streams
- Athena query data to DynamoDB
- Athena Query Results with DynamoDB
- PySpark DataFrame to DynamoDB
Redshift
Lambda
Glue
Lambda
Storage
- S3 vs. EBS vs. EFS
- Amazon S3 (Simple Storage Service)
- Amazon S3 Storage Classes
- Amazon EBS (Elastic Block Store)
- Amazon EFS (Elastic File System)
- AWS Storage Gateway
- AWS Snowball
- Amazon FSx
- AWS Backup
Security
Write PySpark DataFrame to DynamoDB Using Boto3
To write a PySpark DataFrame to an Amazon DynamoDB table, you can use the Boto3 library along with the batch_write_item API. Since DynamoDB does not natively support bulk writes from Spark, you need to convert the DataFrame into a format that can be inserted into DynamoDB efficiently.
Steps to Write PySpark DataFrame to DynamoDB
- Read/Create a PySpark DataFrame
- Convert DataFrame Rows to a Format Suitable for DynamoDB
- Use Boto3 to Write Data to DynamoDB in Batches
Example: Writing a PySpark DataFrame to DynamoDB
import boto3import jsonfrom pyspark.sql import SparkSession
# Initialize Spark sessionspark = SparkSession.builder.appName("DynamoDBWriteExample").getOrCreate()
# Sample DataFramedata = [ ("123", "John Doe", 30), ("456", "Jane Smith", 28), ("789", "Mike Johnson", 35)]
columns = ["id", "name", "age"]df = spark.createDataFrame(data, columns)
# Convert DataFrame to a list of dictionariesdynamodb_items = df.rdd.map(lambda row: { "PutRequest": { "Item": { "id": {"S": row["id"]}, "name": {"S": row["name"]}, "age": {"N": str(row["age"])} } }}).collect()
# Initialize DynamoDB clientdynamodb = boto3.client("dynamodb", region_name="us-east-1")
# Function to batch write to DynamoDBdef batch_write(table_name, items): table_items = {"RequestItems": {table_name: items}} response = dynamodb.batch_write_item(**table_items) return response
# Write data to DynamoDB in batches (25 items per batch)table_name = "YourDynamoDBTable"batch_size = 25for i in range(0, len(dynamodb_items), batch_size): batch = dynamodb_items[i:i+batch_size] response = batch_write(table_name, batch) print(f"Batch {i//batch_size + 1} written:", response)
print("Data successfully written to DynamoDB!")
Where to Use This?
- ETL Pipelines: Storing processed data in DynamoDB for low-latency access.
- Streaming Data Processing: Writing real-time data processed via PySpark.
- Data Aggregation: Storing aggregated metrics for API usage.
Key Considerations
- DynamoDB’s Write Capacity: Ensure proper provisioned capacity or use on-demand mode.
- Batch Write Limits: DynamoDB supports a max of 25 items per batch.
- Data Serialization: Convert numeric values to strings before inserting into DynamoDB.
This approach efficiently writes PySpark DataFrames to DynamoDB while handling batch writes and serialization. 🚀