Amazon Web Services
Compute
- AWS EC2
- EC2 Instance Types
- EC2 Pricing Models
- EC2 Auto Scaling
- Elastic Load Balancing-ELB
- AWS Lambda – Serverless Computing
- Amazon Lightsail
- AWS Elastic Beanstalk
- AWS Fargate
- Amazon ECS (Elastic Container Service)
- Amazon EKS (Elastic Kubernetes Service)
DynamoDB
- DynamoDB Global Table vs Regular DynamoDB Table
- DynamoDB Streams
- Athena query data to DynamoDB
- Athena Query Results with DynamoDB
- PySpark DataFrame to DynamoDB
Redshift
Lambda
Glue
Lambda
Storage
- S3 vs. EBS vs. EFS
- Amazon S3 (Simple Storage Service)
- Amazon S3 Storage Classes
- Amazon EBS (Elastic Block Store)
- Amazon EFS (Elastic File System)
- AWS Storage Gateway
- AWS Snowball
- Amazon FSx
- AWS Backup
Security
Program: Save Athena query data to DynamoDB Table
1. Import Package
import boto3import timeimport csvimport io
2. Setup required configuration object
# ConfigurationDATABASE_NAME = "your_database"TABLE_NAME = "your_dynamodb_table"S3_OUTPUT_BUCKET = "your-athena-query-results-bucket"S3_OUTPUT_PREFIX = "athena_results/"ATHENA_QUERY = "SELECT id, name, age FROM your_athena_table LIMIT 10;"
3. run the athena query
def run_athena_query(): """Submit Athena query and return the query execution ID""" response = athena_client.start_query_execution( QueryString=ATHENA_QUERY, QueryExecutionContext={"Database": DATABASE_NAME}, ResultConfiguration={"OutputLocation": f"s3://{S3_OUTPUT_BUCKET}/{S3_OUTPUT_PREFIX}"} ) return response['QueryExecutionId']
4. the method help to get execution id and pass the result to next stage
def wait_for_query_completion(query_execution_id): """Wait for Athena query to complete""" while True: response = athena_client.get_query_execution(QueryExecutionId=query_execution_id) status = response["QueryExecution"]["Status"]["State"] if status in ["SUCCEEDED", "FAILED", "CANCELLED"]: return status time.sleep(2) # Wait before checking again
5. Fetch results from Athena query and return as a list of dictionaries
Retrieve the query results from Amazon S3 (where Athena stores output).
def fetch_query_results(query_execution_id): """Fetch results from Athena query and return as a list of dictionaries""" query_results_path = f"{S3_OUTPUT_PREFIX}{query_execution_id}.csv"
# Read CSV data from S3 obj = s3_client.get_object(Bucket=S3_OUTPUT_BUCKET, Key=query_results_path) content = obj['Body'].read().decode('utf-8')
# Parse CSV data = [] csv_reader = csv.DictReader(io.StringIO(content)) for row in csv_reader: data.append(row) return data
6. Write query results to DynamoDB
def write_to_dynamodb(data): """Write query results to DynamoDB""" table = dynamodb.Table(TABLE_NAME) for row in data: table.put_item(Item=row) print("Data successfully written to DynamoDB")
2. Main method to process and starting point of program
Parse and write the results to DynamoDB using the Boto3 DynamoDB client
def main(): query_id = run_athena_query() print(f"Executing Athena Query: {query_id}")
status = wait_for_query_completion(query_id) if status != "SUCCEEDED": print(f"Query failed with status: {status}") return
data = fetch_query_results(query_id) if data: write_to_dynamodb(data) else: print("No data retrieved from Athena query.")
Full program
import boto3import timeimport csvimport io
# AWS clientsathena_client = boto3.client('athena', region_name='us-east-1')s3_client = boto3.client('s3', region_name='us-east-1')dynamodb = boto3.resource('dynamodb', region_name='us-east-1')
# ConfigurationDATABASE_NAME = "your_database"TABLE_NAME = "your_dynamodb_table"S3_OUTPUT_BUCKET = "your-athena-query-results-bucket"S3_OUTPUT_PREFIX = "athena_results/"ATHENA_QUERY = "SELECT id, name, age FROM your_athena_table LIMIT 10;"
def run_athena_query(): """Submit Athena query and return the query execution ID""" response = athena_client.start_query_execution( QueryString=ATHENA_QUERY, QueryExecutionContext={"Database": DATABASE_NAME}, ResultConfiguration={"OutputLocation": f"s3://{S3_OUTPUT_BUCKET}/{S3_OUTPUT_PREFIX}"} ) return response['QueryExecutionId']
def wait_for_query_completion(query_execution_id): """Wait for Athena query to complete""" while True: response = athena_client.get_query_execution(QueryExecutionId=query_execution_id) status = response["QueryExecution"]["Status"]["State"] if status in ["SUCCEEDED", "FAILED", "CANCELLED"]: return status time.sleep(2) # Wait before checking again
def fetch_query_results(query_execution_id): """Fetch results from Athena query and return as a list of dictionaries""" query_results_path = f"{S3_OUTPUT_PREFIX}{query_execution_id}.csv"
# Read CSV data from S3 obj = s3_client.get_object(Bucket=S3_OUTPUT_BUCKET, Key=query_results_path) content = obj['Body'].read().decode('utf-8')
# Parse CSV data = [] csv_reader = csv.DictReader(io.StringIO(content)) for row in csv_reader: data.append(row) return data
def write_to_dynamodb(data): """Write query results to DynamoDB""" table = dynamodb.Table(TABLE_NAME) for row in data: table.put_item(Item=row) print("Data successfully written to DynamoDB")
def main(): query_id = run_athena_query() print(f"Executing Athena Query: {query_id}")
status = wait_for_query_completion(query_id) if status != "SUCCEEDED": print(f"Query failed with status: {status}") return
data = fetch_query_results(query_id) if data: write_to_dynamodb(data) else: print("No data retrieved from Athena query.")
if __name__ == "__main__": main()