Integrating AWS Athena Query Results with DynamoDB using AWS Glue

This program demonstrates a seamless integration of AWS Athena, AWS Glue, and Amazon DynamoDB for efficient data processing and storage. The process involves executing an Athena query using Boto3, retrieving query results from S3, loading data into a Spark DataFrame using GlueContext, and finally writing the transformed data into DynamoDB in batch mode with the help of AWS Glue Data Catalog. This approach ensures scalability, cost-effectiveness, and real-time analytics while leveraging serverless AWS services.

import sys
import time
import boto3
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions

# AWS Clients
athena_client = boto3.client("athena", region_name="us-east-1")
s3_client = boto3.client("s3", region_name="us-east-1")

# AWS Glue Context
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session

# Configuration
DATABASE_NAME = "your_database"
DYNAMODB_TABLE_NAME = "your_dynamodb_table"
S3_OUTPUT_BUCKET = "your-athena-query-results-bucket"
S3_OUTPUT_PREFIX = "athena_results/"
ATHENA_QUERY = "SELECT id, name, age FROM your_athena_table LIMIT 10;"


def run_athena_query():
    """Submit Athena query and return the query execution ID"""
    response = athena_client.start_query_execution(
        QueryString=ATHENA_QUERY,
        QueryExecutionContext={"Database": DATABASE_NAME},
        ResultConfiguration={"OutputLocation": f"s3://{S3_OUTPUT_BUCKET}/{S3_OUTPUT_PREFIX}"}
    )
    return response['QueryExecutionId']


def wait_for_query_completion(query_execution_id):
    """Wait for Athena query to complete"""
    while True:
        response = athena_client.get_query_execution(QueryExecutionId=query_execution_id)
        status = response["QueryExecution"]["Status"]["State"]
        if status in ["SUCCEEDED", "FAILED", "CANCELLED"]:
            return status
        time.sleep(2)  # Wait before checking again


def read_athena_results_from_s3(query_execution_id):
    """Read Athena query results from S3 into a Spark DataFrame"""
    s3_path = f"s3://{S3_OUTPUT_BUCKET}/{S3_OUTPUT_PREFIX}{query_execution_id}.csv"

    # Read data using GlueContext
    df = spark.read.option("header", "true").csv(s3_path)
    return df


def write_to_dynamodb(df):
    """Write Spark DataFrame to DynamoDB using Glue DynamicFrame"""
    dynamic_frame = DynamicFrame.fromDF(df, glueContext, "dynamodb_frame")

    glueContext.write_dynamic_frame.from_options(
        frame=dynamic_frame,
        connection_type="dynamodb",
        connection_options={"dynamodb.output.tableName": DYNAMODB_TABLE_NAME}
    )
    print("Data successfully written to DynamoDB")


def main():
    query_id = run_athena_query()
    print(f"Executing Athena Query: {query_id}")

    status = wait_for_query_completion(query_id)
    if status != "SUCCEEDED":
        print(f"Query failed with status: {status}")
        return

    df = read_athena_results_from_s3(query_id)
    if df.count() > 0:
        write_to_dynamodb(df)
    else:
        print("No data retrieved from Athena query.")


if __name__ == "__main__":
    main()