Program: Save Athena query data to DynamoDB Table

1. Import Package

import boto3
import time
import csv
import io

2. Setup required configuration object

# Configuration
DATABASE_NAME = "your_database"
TABLE_NAME = "your_dynamodb_table"
S3_OUTPUT_BUCKET = "your-athena-query-results-bucket"
S3_OUTPUT_PREFIX = "athena_results/"
ATHENA_QUERY = "SELECT id, name, age FROM your_athena_table LIMIT 10;"

3. run the athena query

def run_athena_query():
    """Submit Athena query and return the query execution ID"""
    response = athena_client.start_query_execution(
        QueryString=ATHENA_QUERY,
        QueryExecutionContext={"Database": DATABASE_NAME},
        ResultConfiguration={"OutputLocation": f"s3://{S3_OUTPUT_BUCKET}/{S3_OUTPUT_PREFIX}"}
    )
    return response['QueryExecutionId']

4. the method help to get execution id and pass the result to next stage


def wait_for_query_completion(query_execution_id):
    """Wait for Athena query to complete"""
    while True:
        response = athena_client.get_query_execution(QueryExecutionId=query_execution_id)
        status = response["QueryExecution"]["Status"]["State"]
        if status in ["SUCCEEDED", "FAILED", "CANCELLED"]:
            return status
        time.sleep(2)  # Wait before checking again

5. Fetch results from Athena query and return as a list of dictionaries

Retrieve the query results from Amazon S3 (where Athena stores output).


def fetch_query_results(query_execution_id):
    """Fetch results from Athena query and return as a list of dictionaries"""
    query_results_path = f"{S3_OUTPUT_PREFIX}{query_execution_id}.csv"
    
    # Read CSV data from S3
    obj = s3_client.get_object(Bucket=S3_OUTPUT_BUCKET, Key=query_results_path)
    content = obj['Body'].read().decode('utf-8')
    
    # Parse CSV
    data = []
    csv_reader = csv.DictReader(io.StringIO(content))
    for row in csv_reader:
        data.append(row)
    return data

6. Write query results to DynamoDB

def write_to_dynamodb(data):
    """Write query results to DynamoDB"""
    table = dynamodb.Table(TABLE_NAME)
    for row in data:
        table.put_item(Item=row)
    print("Data successfully written to DynamoDB")

2. Main method to process and starting point of program

Parse and write the results to DynamoDB using the Boto3 DynamoDB client

def main():
    query_id = run_athena_query()
    print(f"Executing Athena Query: {query_id}")

    status = wait_for_query_completion(query_id)
    if status != "SUCCEEDED":
        print(f"Query failed with status: {status}")
        return

    data = fetch_query_results(query_id)
    if data:
        write_to_dynamodb(data)
    else:
        print("No data retrieved from Athena query.")

Full program

import boto3
import time
import csv
import io

# AWS clients
athena_client = boto3.client('athena', region_name='us-east-1')
s3_client = boto3.client('s3', region_name='us-east-1')
dynamodb = boto3.resource('dynamodb', region_name='us-east-1')

# Configuration
DATABASE_NAME = "your_database"
TABLE_NAME = "your_dynamodb_table"
S3_OUTPUT_BUCKET = "your-athena-query-results-bucket"
S3_OUTPUT_PREFIX = "athena_results/"
ATHENA_QUERY = "SELECT id, name, age FROM your_athena_table LIMIT 10;"


def run_athena_query():
    """Submit Athena query and return the query execution ID"""
    response = athena_client.start_query_execution(
        QueryString=ATHENA_QUERY,
        QueryExecutionContext={"Database": DATABASE_NAME},
        ResultConfiguration={"OutputLocation": f"s3://{S3_OUTPUT_BUCKET}/{S3_OUTPUT_PREFIX}"}
    )
    return response['QueryExecutionId']


def wait_for_query_completion(query_execution_id):
    """Wait for Athena query to complete"""
    while True:
        response = athena_client.get_query_execution(QueryExecutionId=query_execution_id)
        status = response["QueryExecution"]["Status"]["State"]
        if status in ["SUCCEEDED", "FAILED", "CANCELLED"]:
            return status
        time.sleep(2)  # Wait before checking again


def fetch_query_results(query_execution_id):
    """Fetch results from Athena query and return as a list of dictionaries"""
    query_results_path = f"{S3_OUTPUT_PREFIX}{query_execution_id}.csv"
    
    # Read CSV data from S3
    obj = s3_client.get_object(Bucket=S3_OUTPUT_BUCKET, Key=query_results_path)
    content = obj['Body'].read().decode('utf-8')
    
    # Parse CSV
    data = []
    csv_reader = csv.DictReader(io.StringIO(content))
    for row in csv_reader:
        data.append(row)
    return data


def write_to_dynamodb(data):
    """Write query results to DynamoDB"""
    table = dynamodb.Table(TABLE_NAME)
    for row in data:
        table.put_item(Item=row)
    print("Data successfully written to DynamoDB")


def main():
    query_id = run_athena_query()
    print(f"Executing Athena Query: {query_id}")

    status = wait_for_query_completion(query_id)
    if status != "SUCCEEDED":
        print(f"Query failed with status: {status}")
        return

    data = fetch_query_results(query_id)
    if data:
        write_to_dynamodb(data)
    else:
        print("No data retrieved from Athena query.")


if __name__ == "__main__":
    main()