AWS
- S3 vs. EBS vs. EFS
- AWS EC2
- AWS EMR
- AWS Glue
- AWS Glue Component
- AWS Glue: Interviews Questions and Answers
- AWS Lambda example
- AWS Lambda
- AWS Kinesis Features
- AWS Redshift : Questions and Answers
- Amazon Redshift
- AWS S3
- Step Functions
- Unlocking Efficiency and Flexibility with AWS Step Functions
- AWS Tagging for Cost Management, Resource Optimization, and Security
- AWS Control Tower vs AWS Organizations
- Choosing the Right Orchestration Tool for Your Workflow
- DynamoDB Global Table vs Regular DynamoDB Table
- AWS DynamoDB Streams
- AWS Kinesis
- CloudFront vs Global Accelerator
- AWS Glue: save Athena query data to DynamoDB
- AWS Glue(spark): save Athena query data to DynamoDB
- PySpark DataFrame to DynamoDB
Program: Save Athena query data to DynamoDB Table
1. Import Package
import boto3
import time
import csv
import io
2. Setup required configuration object
# Configuration
DATABASE_NAME = "your_database"
TABLE_NAME = "your_dynamodb_table"
S3_OUTPUT_BUCKET = "your-athena-query-results-bucket"
S3_OUTPUT_PREFIX = "athena_results/"
ATHENA_QUERY = "SELECT id, name, age FROM your_athena_table LIMIT 10;"
3. run the athena query
def run_athena_query():
"""Submit Athena query and return the query execution ID"""
response = athena_client.start_query_execution(
QueryString=ATHENA_QUERY,
QueryExecutionContext={"Database": DATABASE_NAME},
ResultConfiguration={"OutputLocation": f"s3://{S3_OUTPUT_BUCKET}/{S3_OUTPUT_PREFIX}"}
)
return response['QueryExecutionId']
4. the method help to get execution id and pass the result to next stage
def wait_for_query_completion(query_execution_id):
"""Wait for Athena query to complete"""
while True:
response = athena_client.get_query_execution(QueryExecutionId=query_execution_id)
status = response["QueryExecution"]["Status"]["State"]
if status in ["SUCCEEDED", "FAILED", "CANCELLED"]:
return status
time.sleep(2) # Wait before checking again
5. Fetch results from Athena query and return as a list of dictionaries
Retrieve the query results from Amazon S3 (where Athena stores output).
def fetch_query_results(query_execution_id):
"""Fetch results from Athena query and return as a list of dictionaries"""
query_results_path = f"{S3_OUTPUT_PREFIX}{query_execution_id}.csv"
# Read CSV data from S3
obj = s3_client.get_object(Bucket=S3_OUTPUT_BUCKET, Key=query_results_path)
content = obj['Body'].read().decode('utf-8')
# Parse CSV
data = []
csv_reader = csv.DictReader(io.StringIO(content))
for row in csv_reader:
data.append(row)
return data
6. Write query results to DynamoDB
def write_to_dynamodb(data):
"""Write query results to DynamoDB"""
table = dynamodb.Table(TABLE_NAME)
for row in data:
table.put_item(Item=row)
print("Data successfully written to DynamoDB")
2. Main method to process and starting point of program
Parse and write the results to DynamoDB using the Boto3 DynamoDB client
def main():
query_id = run_athena_query()
print(f"Executing Athena Query: {query_id}")
status = wait_for_query_completion(query_id)
if status != "SUCCEEDED":
print(f"Query failed with status: {status}")
return
data = fetch_query_results(query_id)
if data:
write_to_dynamodb(data)
else:
print("No data retrieved from Athena query.")
Full program
import boto3
import time
import csv
import io
# AWS clients
athena_client = boto3.client('athena', region_name='us-east-1')
s3_client = boto3.client('s3', region_name='us-east-1')
dynamodb = boto3.resource('dynamodb', region_name='us-east-1')
# Configuration
DATABASE_NAME = "your_database"
TABLE_NAME = "your_dynamodb_table"
S3_OUTPUT_BUCKET = "your-athena-query-results-bucket"
S3_OUTPUT_PREFIX = "athena_results/"
ATHENA_QUERY = "SELECT id, name, age FROM your_athena_table LIMIT 10;"
def run_athena_query():
"""Submit Athena query and return the query execution ID"""
response = athena_client.start_query_execution(
QueryString=ATHENA_QUERY,
QueryExecutionContext={"Database": DATABASE_NAME},
ResultConfiguration={"OutputLocation": f"s3://{S3_OUTPUT_BUCKET}/{S3_OUTPUT_PREFIX}"}
)
return response['QueryExecutionId']
def wait_for_query_completion(query_execution_id):
"""Wait for Athena query to complete"""
while True:
response = athena_client.get_query_execution(QueryExecutionId=query_execution_id)
status = response["QueryExecution"]["Status"]["State"]
if status in ["SUCCEEDED", "FAILED", "CANCELLED"]:
return status
time.sleep(2) # Wait before checking again
def fetch_query_results(query_execution_id):
"""Fetch results from Athena query and return as a list of dictionaries"""
query_results_path = f"{S3_OUTPUT_PREFIX}{query_execution_id}.csv"
# Read CSV data from S3
obj = s3_client.get_object(Bucket=S3_OUTPUT_BUCKET, Key=query_results_path)
content = obj['Body'].read().decode('utf-8')
# Parse CSV
data = []
csv_reader = csv.DictReader(io.StringIO(content))
for row in csv_reader:
data.append(row)
return data
def write_to_dynamodb(data):
"""Write query results to DynamoDB"""
table = dynamodb.Table(TABLE_NAME)
for row in data:
table.put_item(Item=row)
print("Data successfully written to DynamoDB")
def main():
query_id = run_athena_query()
print(f"Executing Athena Query: {query_id}")
status = wait_for_query_completion(query_id)
if status != "SUCCEEDED":
print(f"Query failed with status: {status}")
return
data = fetch_query_results(query_id)
if data:
write_to_dynamodb(data)
else:
print("No data retrieved from Athena query.")
if __name__ == "__main__":
main()