Cloud  /  AWS

AWS Amazon Web Services 61 guides · updated 2026

Hands-on guides to compute, storage, databases, networking, and serverless on the world's most widely adopted cloud platform.

AWS Lambda Examples: Real Code Patterns for S3, DynamoDB, API Gateway, and SNS

Code examples are the fastest way to understand how Lambda actually works in practice. Each example below is a self-contained pattern you can adapt — with the required IAM permissions and deployment command included.

Example 1: S3 Upload Trigger — Document Processing

A common pattern: when a document lands in S3, Lambda extracts text, counts words, and stores the result in DynamoDB.

import boto3
import json
# Init code — runs once per execution environment
s3 = boto3.client('s3')
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('document-analysis')
def lambda_handler(event, context):
for record in event['Records']:
bucket = record['s3']['bucket']['name']
key = record['s3']['object']['key']
# Download the file
obj = s3.get_object(Bucket=bucket, Key=key)
content = obj['Body'].read().decode('utf-8')
# Simple analysis
word_count = len(content.split())
char_count = len(content)
lines = content.count('\n') + 1
# Store result
doc_id = key.replace('/', '_').replace('.', '_')
table.put_item(Item={
'docId': doc_id,
'sourceKey': key,
'wordCount': word_count,
'charCount': char_count,
'lineCount': lines,
})
print(f"Processed {key}: {word_count} words")
return {'processed': len(event['Records'])}

Required IAM permissions:

{
"Effect": "Allow",
"Action": ["s3:GetObject"],
"Resource": "arn:aws:s3:::my-upload-bucket/*"
},
{
"Effect": "Allow",
"Action": ["dynamodb:PutItem"],
"Resource": "arn:aws:dynamodb:us-east-1:123456789012:table/document-analysis"
}

Deploy and configure S3 trigger:

Terminal window
zip function.zip lambda_function.py
aws lambda create-function \
--function-name doc-processor \
--runtime python3.12 \
--role arn:aws:iam::123456789012:role/doc-processor-role \
--handler lambda_function.lambda_handler \
--zip-file fileb://function.zip
# Allow S3 to invoke the function
aws lambda add-permission \
--function-name doc-processor \
--statement-id s3-trigger \
--action lambda:InvokeFunction \
--principal s3.amazonaws.com \
--source-arn arn:aws:s3:::my-upload-bucket
# Configure S3 event notification
aws s3api put-bucket-notification-configuration \
--bucket my-upload-bucket \
--notification-configuration '{
"LambdaFunctionConfigurations": [{
"LambdaFunctionArn": "arn:aws:lambda:us-east-1:123456789012:function:doc-processor",
"Events": ["s3:ObjectCreated:*"],
"Filter": {"Key": {"FilterRules": [{"Name": "suffix", "Value": ".txt"}]}}
}]
}'

Example 2: DynamoDB Stream — Audit Log

DynamoDB Streams capture every insert, update, and delete. This function writes every change to a separate audit table.

import boto3
from datetime import datetime
dynamodb = boto3.resource('dynamodb')
audit_table = dynamodb.Table('audit-log')
def lambda_handler(event, context):
audit_items = []
for record in event['Records']:
if record['eventName'] == 'REMOVE':
image = record['dynamodb'].get('OldImage', {})
else:
image = record['dynamodb'].get('NewImage', {})
# Deserialise DynamoDB typed values
entity_id = image.get('id', {}).get('S', 'unknown')
audit_items.append({
'auditId': context.aws_request_id + '_' + record['eventID'],
'entityId': entity_id,
'eventType': record['eventName'],
'timestamp': datetime.utcnow().isoformat(),
'sequenceNumber': record['dynamodb']['SequenceNumber'],
})
# Batch write to audit table
with audit_table.batch_writer() as batch:
for item in audit_items:
batch.put_item(Item=item)
print(f"Wrote {len(audit_items)} audit records")
return {'auditRecords': len(audit_items)}

Required IAM permissions:

{
"Effect": "Allow",
"Action": [
"dynamodb:GetRecords",
"dynamodb:GetShardIterator",
"dynamodb:DescribeStream",
"dynamodb:ListStreams"
],
"Resource": "arn:aws:dynamodb:us-east-1:123456789012:table/orders/stream/*"
},
{
"Effect": "Allow",
"Action": ["dynamodb:PutItem", "dynamodb:BatchWriteItem"],
"Resource": "arn:aws:dynamodb:us-east-1:123456789012:table/audit-log"
}

Configure stream trigger:

Terminal window
# First enable streams on the source table
aws dynamodb update-table \
--table-name orders \
--stream-specification StreamEnabled=true,StreamViewType=NEW_AND_OLD_IMAGES
# Get the stream ARN
STREAM_ARN=$(aws dynamodb describe-table \
--table-name orders \
--query 'Table.LatestStreamArn' --output text)
# Add stream as event source
aws lambda create-event-source-mapping \
--function-name audit-logger \
--event-source-arn $STREAM_ARN \
--starting-position LATEST \
--batch-size 100

Example 3: API Gateway — REST Endpoint

A Lambda function behind API Gateway that handles CRUD operations for a items table.

import boto3
import json
import os
from decimal import Decimal
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table(os.environ['TABLE_NAME'])
def decimal_to_float(obj):
"""DynamoDB returns Decimals; JSON can't serialise them."""
if isinstance(obj, Decimal):
return float(obj)
raise TypeError
def lambda_handler(event, context):
http_method = event['httpMethod']
path_params = event.get('pathParameters') or {}
body = json.loads(event.get('body') or '{}')
try:
if http_method == 'GET' and 'itemId' in path_params:
result = table.get_item(Key={'itemId': path_params['itemId']})
item = result.get('Item')
if not item:
return response(404, {'error': 'Item not found'})
return response(200, item)
elif http_method == 'POST':
import uuid
item = {**body, 'itemId': str(uuid.uuid4())}
table.put_item(Item=item)
return response(201, {'itemId': item['itemId']})
elif http_method == 'DELETE' and 'itemId' in path_params:
table.delete_item(Key={'itemId': path_params['itemId']})
return response(204, {})
else:
return response(400, {'error': 'Unsupported operation'})
except Exception as e:
print(f"Error: {e}")
return response(500, {'error': 'Internal server error'})
def response(status_code, body):
return {
'statusCode': status_code,
'headers': {
'Content-Type': 'application/json',
'Access-Control-Allow-Origin': '*',
},
'body': json.dumps(body, default=decimal_to_float),
}

Required IAM permissions:

{
"Effect": "Allow",
"Action": ["dynamodb:GetItem", "dynamodb:PutItem", "dynamodb:DeleteItem"],
"Resource": "arn:aws:dynamodb:us-east-1:123456789012:table/items"
}

Quick deploy with API Gateway using SAM:

template.yaml
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Resources:
ItemsFunction:
Type: AWS::Serverless::Function
Properties:
CodeUri: src/
Handler: lambda_function.lambda_handler
Runtime: python3.12
Environment:
Variables:
TABLE_NAME: !Ref ItemsTable
Events:
GetItem:
Type: Api
Properties:
Path: /items/{itemId}
Method: GET
CreateItem:
Type: Api
Properties:
Path: /items
Method: POST

Example 4: SNS Topic Subscriber — Email Notification

Lambda subscribed to an SNS topic sends formatted email notifications via SES.

import boto3
import json
import os
ses = boto3.client('ses', region_name='us-east-1')
FROM_EMAIL = os.environ['FROM_EMAIL']
ADMIN_EMAIL = os.environ['ADMIN_EMAIL']
def lambda_handler(event, context):
for record in event['Records']:
# SNS message is a string; parse if it's JSON
message_str = record['Sns']['Message']
subject = record['Sns']['Subject'] or 'Notification'
try:
message_data = json.loads(message_str)
body = format_message(message_data)
except json.JSONDecodeError:
body = message_str
ses.send_email(
Source=FROM_EMAIL,
Destination={'ToAddresses': [ADMIN_EMAIL]},
Message={
'Subject': {'Data': subject},
'Body': {
'Text': {'Data': body},
'Html': {'Data': f'<pre>{body}</pre>'},
}
}
)
print(f"Sent email for SNS message {record['Sns']['MessageId']}")
return {'emailsSent': len(event['Records'])}
def format_message(data):
lines = []
for key, value in data.items():
lines.append(f"{key}: {value}")
return '\n'.join(lines)

Required IAM permissions:

{
"Effect": "Allow",
"Action": ["ses:SendEmail"],
"Resource": "*",
"Condition": {
"StringEquals": {
"ses:FromAddress": "noreply@example.com"
}
}
}

Subscribe Lambda to SNS:

Terminal window
# Allow SNS to invoke Lambda
aws lambda add-permission \
--function-name email-notifier \
--statement-id sns-trigger \
--action lambda:InvokeFunction \
--principal sns.amazonaws.com \
--source-arn arn:aws:sns:us-east-1:123456789012:alerts-topic
# Subscribe Lambda to SNS topic
aws sns subscribe \
--topic-arn arn:aws:sns:us-east-1:123456789012:alerts-topic \
--protocol lambda \
--notification-endpoint arn:aws:lambda:us-east-1:123456789012:function:email-notifier

Testing Lambda Functions Locally

Before deploying, test the handler function directly in Python with a mock event:

test_local.py
import json
from lambda_function import lambda_handler
# Mock S3 event
mock_event = {
"Records": [{
"s3": {
"bucket": {"name": "test-bucket"},
"object": {"key": "test/document.txt"}
}
}]
}
class MockContext:
aws_request_id = "test-request-123"
function_name = "test-function"
def get_remaining_time_in_millis(self):
return 30000
result = lambda_handler(mock_event, MockContext())
print(json.dumps(result, indent=2))

For more complex testing with real AWS services locally, use AWS SAM CLI:

Terminal window
# Start local API
sam local start-api
# Invoke function locally with event file
sam local invoke ItemsFunction -e events/get-item.json

Error Handling Best Practices

Always distinguish between retryable and non-retryable errors:

def lambda_handler(event, context):
for record in event['Records']:
try:
process_record(record)
except ValueError as e:
# Data problem — no point retrying
print(f"Bad data in record, skipping: {e}")
log_to_dead_letter(record, str(e))
except Exception as e:
# Unknown error — raise to trigger retry
print(f"Unexpected error: {e}")
raise

For SQS-triggered functions, return successfully processed message IDs so Lambda does not retry them alongside failed ones:

def lambda_handler(event, context):
batch_item_failures = []
for record in event['Records']:
try:
process(record)
except Exception as e:
batch_item_failures.append(
{"itemIdentifier": record["messageId"]}
)
return {"batchItemFailures": batch_item_failures}

This requires the SQS trigger to have Report Batch Item Failures enabled.