Amazon Web Services
Compute
- AWS EC2
- EC2 Instance Types
- EC2 Pricing Models
- EC2 Auto Scaling
- Elastic Load Balancing-ELB
- AWS Lambda – Serverless Computing
- Amazon Lightsail
- AWS Elastic Beanstalk
- AWS Fargate
- Amazon ECS (Elastic Container Service)
- Amazon EKS (Elastic Kubernetes Service)
DynamoDB
- DynamoDB Global Table vs Regular DynamoDB Table
- DynamoDB Streams
- Athena query data to DynamoDB
- Athena Query Results with DynamoDB
- PySpark DataFrame to DynamoDB
Redshift
Lambda
Glue
Lambda
Storage
- S3 vs. EBS vs. EFS
- Amazon S3 (Simple Storage Service)
- Amazon S3 Storage Classes
- Amazon EBS (Elastic Block Store)
- Amazon EFS (Elastic File System)
- AWS Storage Gateway
- AWS Snowball
- Amazon FSx
- AWS Backup
Security
Integrating AWS Athena Query Results with DynamoDB using AWS Glue
This program demonstrates a seamless integration of AWS Athena, AWS Glue, and Amazon DynamoDB for efficient data processing and storage. The process involves executing an Athena query using Boto3, retrieving query results from S3, loading data into a Spark DataFrame using GlueContext, and finally writing the transformed data into DynamoDB in batch mode with the help of AWS Glue Data Catalog. This approach ensures scalability, cost-effectiveness, and real-time analytics while leveraging serverless AWS services.
import sysimport timeimport boto3from pyspark.context import SparkContextfrom awsglue.context import GlueContextfrom awsglue.dynamicframe import DynamicFramefrom awsglue.transforms import *from awsglue.utils import getResolvedOptions
# AWS Clientsathena_client = boto3.client("athena", region_name="us-east-1")s3_client = boto3.client("s3", region_name="us-east-1")
# AWS Glue Contextsc = SparkContext()glueContext = GlueContext(sc)spark = glueContext.spark_session
# ConfigurationDATABASE_NAME = "your_database"DYNAMODB_TABLE_NAME = "your_dynamodb_table"S3_OUTPUT_BUCKET = "your-athena-query-results-bucket"S3_OUTPUT_PREFIX = "athena_results/"ATHENA_QUERY = "SELECT id, name, age FROM your_athena_table LIMIT 10;"
def run_athena_query(): """Submit Athena query and return the query execution ID""" response = athena_client.start_query_execution( QueryString=ATHENA_QUERY, QueryExecutionContext={"Database": DATABASE_NAME}, ResultConfiguration={"OutputLocation": f"s3://{S3_OUTPUT_BUCKET}/{S3_OUTPUT_PREFIX}"} ) return response['QueryExecutionId']
def wait_for_query_completion(query_execution_id): """Wait for Athena query to complete""" while True: response = athena_client.get_query_execution(QueryExecutionId=query_execution_id) status = response["QueryExecution"]["Status"]["State"] if status in ["SUCCEEDED", "FAILED", "CANCELLED"]: return status time.sleep(2) # Wait before checking again
def read_athena_results_from_s3(query_execution_id): """Read Athena query results from S3 into a Spark DataFrame""" s3_path = f"s3://{S3_OUTPUT_BUCKET}/{S3_OUTPUT_PREFIX}{query_execution_id}.csv"
# Read data using GlueContext df = spark.read.option("header", "true").csv(s3_path) return df
def write_to_dynamodb(df): """Write Spark DataFrame to DynamoDB using Glue DynamicFrame""" dynamic_frame = DynamicFrame.fromDF(df, glueContext, "dynamodb_frame")
glueContext.write_dynamic_frame.from_options( frame=dynamic_frame, connection_type="dynamodb", connection_options={"dynamodb.output.tableName": DYNAMODB_TABLE_NAME} ) print("Data successfully written to DynamoDB")
def main(): query_id = run_athena_query() print(f"Executing Athena Query: {query_id}")
status = wait_for_query_completion(query_id) if status != "SUCCEEDED": print(f"Query failed with status: {status}") return
df = read_athena_results_from_s3(query_id) if df.count() > 0: write_to_dynamodb(df) else: print("No data retrieved from Athena query.")
if __name__ == "__main__": main()