AWS Lambda Examples: Real Code Patterns for S3, DynamoDB, API Gateway, and SNS
Code examples are the fastest way to understand how Lambda actually works in practice. Each example below is a self-contained pattern you can adapt — with the required IAM permissions and deployment command included.
Example 1: S3 Upload Trigger — Document Processing
A common pattern: when a document lands in S3, Lambda extracts text, counts words, and stores the result in DynamoDB.
import boto3import json
# Init code — runs once per execution environments3 = boto3.client('s3')dynamodb = boto3.resource('dynamodb')table = dynamodb.Table('document-analysis')
def lambda_handler(event, context): for record in event['Records']: bucket = record['s3']['bucket']['name'] key = record['s3']['object']['key']
# Download the file obj = s3.get_object(Bucket=bucket, Key=key) content = obj['Body'].read().decode('utf-8')
# Simple analysis word_count = len(content.split()) char_count = len(content) lines = content.count('\n') + 1
# Store result doc_id = key.replace('/', '_').replace('.', '_') table.put_item(Item={ 'docId': doc_id, 'sourceKey': key, 'wordCount': word_count, 'charCount': char_count, 'lineCount': lines, })
print(f"Processed {key}: {word_count} words")
return {'processed': len(event['Records'])}Required IAM permissions:
{ "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::my-upload-bucket/*"},{ "Effect": "Allow", "Action": ["dynamodb:PutItem"], "Resource": "arn:aws:dynamodb:us-east-1:123456789012:table/document-analysis"}Deploy and configure S3 trigger:
zip function.zip lambda_function.py
aws lambda create-function \ --function-name doc-processor \ --runtime python3.12 \ --role arn:aws:iam::123456789012:role/doc-processor-role \ --handler lambda_function.lambda_handler \ --zip-file fileb://function.zip
# Allow S3 to invoke the functionaws lambda add-permission \ --function-name doc-processor \ --statement-id s3-trigger \ --action lambda:InvokeFunction \ --principal s3.amazonaws.com \ --source-arn arn:aws:s3:::my-upload-bucket
# Configure S3 event notificationaws s3api put-bucket-notification-configuration \ --bucket my-upload-bucket \ --notification-configuration '{ "LambdaFunctionConfigurations": [{ "LambdaFunctionArn": "arn:aws:lambda:us-east-1:123456789012:function:doc-processor", "Events": ["s3:ObjectCreated:*"], "Filter": {"Key": {"FilterRules": [{"Name": "suffix", "Value": ".txt"}]}} }] }'Example 2: DynamoDB Stream — Audit Log
DynamoDB Streams capture every insert, update, and delete. This function writes every change to a separate audit table.
import boto3from datetime import datetime
dynamodb = boto3.resource('dynamodb')audit_table = dynamodb.Table('audit-log')
def lambda_handler(event, context): audit_items = []
for record in event['Records']: if record['eventName'] == 'REMOVE': image = record['dynamodb'].get('OldImage', {}) else: image = record['dynamodb'].get('NewImage', {})
# Deserialise DynamoDB typed values entity_id = image.get('id', {}).get('S', 'unknown')
audit_items.append({ 'auditId': context.aws_request_id + '_' + record['eventID'], 'entityId': entity_id, 'eventType': record['eventName'], 'timestamp': datetime.utcnow().isoformat(), 'sequenceNumber': record['dynamodb']['SequenceNumber'], })
# Batch write to audit table with audit_table.batch_writer() as batch: for item in audit_items: batch.put_item(Item=item)
print(f"Wrote {len(audit_items)} audit records") return {'auditRecords': len(audit_items)}Required IAM permissions:
{ "Effect": "Allow", "Action": [ "dynamodb:GetRecords", "dynamodb:GetShardIterator", "dynamodb:DescribeStream", "dynamodb:ListStreams" ], "Resource": "arn:aws:dynamodb:us-east-1:123456789012:table/orders/stream/*"},{ "Effect": "Allow", "Action": ["dynamodb:PutItem", "dynamodb:BatchWriteItem"], "Resource": "arn:aws:dynamodb:us-east-1:123456789012:table/audit-log"}Configure stream trigger:
# First enable streams on the source tableaws dynamodb update-table \ --table-name orders \ --stream-specification StreamEnabled=true,StreamViewType=NEW_AND_OLD_IMAGES
# Get the stream ARNSTREAM_ARN=$(aws dynamodb describe-table \ --table-name orders \ --query 'Table.LatestStreamArn' --output text)
# Add stream as event sourceaws lambda create-event-source-mapping \ --function-name audit-logger \ --event-source-arn $STREAM_ARN \ --starting-position LATEST \ --batch-size 100Example 3: API Gateway — REST Endpoint
A Lambda function behind API Gateway that handles CRUD operations for a items table.
import boto3import jsonimport osfrom decimal import Decimal
dynamodb = boto3.resource('dynamodb')table = dynamodb.Table(os.environ['TABLE_NAME'])
def decimal_to_float(obj): """DynamoDB returns Decimals; JSON can't serialise them.""" if isinstance(obj, Decimal): return float(obj) raise TypeError
def lambda_handler(event, context): http_method = event['httpMethod'] path_params = event.get('pathParameters') or {} body = json.loads(event.get('body') or '{}')
try: if http_method == 'GET' and 'itemId' in path_params: result = table.get_item(Key={'itemId': path_params['itemId']}) item = result.get('Item') if not item: return response(404, {'error': 'Item not found'}) return response(200, item)
elif http_method == 'POST': import uuid item = {**body, 'itemId': str(uuid.uuid4())} table.put_item(Item=item) return response(201, {'itemId': item['itemId']})
elif http_method == 'DELETE' and 'itemId' in path_params: table.delete_item(Key={'itemId': path_params['itemId']}) return response(204, {})
else: return response(400, {'error': 'Unsupported operation'})
except Exception as e: print(f"Error: {e}") return response(500, {'error': 'Internal server error'})
def response(status_code, body): return { 'statusCode': status_code, 'headers': { 'Content-Type': 'application/json', 'Access-Control-Allow-Origin': '*', }, 'body': json.dumps(body, default=decimal_to_float), }Required IAM permissions:
{ "Effect": "Allow", "Action": ["dynamodb:GetItem", "dynamodb:PutItem", "dynamodb:DeleteItem"], "Resource": "arn:aws:dynamodb:us-east-1:123456789012:table/items"}Quick deploy with API Gateway using SAM:
AWSTemplateFormatVersion: '2010-09-09'Transform: AWS::Serverless-2016-10-31
Resources: ItemsFunction: Type: AWS::Serverless::Function Properties: CodeUri: src/ Handler: lambda_function.lambda_handler Runtime: python3.12 Environment: Variables: TABLE_NAME: !Ref ItemsTable Events: GetItem: Type: Api Properties: Path: /items/{itemId} Method: GET CreateItem: Type: Api Properties: Path: /items Method: POSTExample 4: SNS Topic Subscriber — Email Notification
Lambda subscribed to an SNS topic sends formatted email notifications via SES.
import boto3import jsonimport os
ses = boto3.client('ses', region_name='us-east-1')
FROM_EMAIL = os.environ['FROM_EMAIL']ADMIN_EMAIL = os.environ['ADMIN_EMAIL']
def lambda_handler(event, context): for record in event['Records']: # SNS message is a string; parse if it's JSON message_str = record['Sns']['Message'] subject = record['Sns']['Subject'] or 'Notification'
try: message_data = json.loads(message_str) body = format_message(message_data) except json.JSONDecodeError: body = message_str
ses.send_email( Source=FROM_EMAIL, Destination={'ToAddresses': [ADMIN_EMAIL]}, Message={ 'Subject': {'Data': subject}, 'Body': { 'Text': {'Data': body}, 'Html': {'Data': f'<pre>{body}</pre>'}, } } ) print(f"Sent email for SNS message {record['Sns']['MessageId']}")
return {'emailsSent': len(event['Records'])}
def format_message(data): lines = [] for key, value in data.items(): lines.append(f"{key}: {value}") return '\n'.join(lines)Required IAM permissions:
{ "Effect": "Allow", "Action": ["ses:SendEmail"], "Resource": "*", "Condition": { "StringEquals": { "ses:FromAddress": "noreply@example.com" } }}Subscribe Lambda to SNS:
# Allow SNS to invoke Lambdaaws lambda add-permission \ --function-name email-notifier \ --statement-id sns-trigger \ --action lambda:InvokeFunction \ --principal sns.amazonaws.com \ --source-arn arn:aws:sns:us-east-1:123456789012:alerts-topic
# Subscribe Lambda to SNS topicaws sns subscribe \ --topic-arn arn:aws:sns:us-east-1:123456789012:alerts-topic \ --protocol lambda \ --notification-endpoint arn:aws:lambda:us-east-1:123456789012:function:email-notifierTesting Lambda Functions Locally
Before deploying, test the handler function directly in Python with a mock event:
import jsonfrom lambda_function import lambda_handler
# Mock S3 eventmock_event = { "Records": [{ "s3": { "bucket": {"name": "test-bucket"}, "object": {"key": "test/document.txt"} } }]}
class MockContext: aws_request_id = "test-request-123" function_name = "test-function" def get_remaining_time_in_millis(self): return 30000
result = lambda_handler(mock_event, MockContext())print(json.dumps(result, indent=2))For more complex testing with real AWS services locally, use AWS SAM CLI:
# Start local APIsam local start-api
# Invoke function locally with event filesam local invoke ItemsFunction -e events/get-item.jsonError Handling Best Practices
Always distinguish between retryable and non-retryable errors:
def lambda_handler(event, context): for record in event['Records']: try: process_record(record) except ValueError as e: # Data problem — no point retrying print(f"Bad data in record, skipping: {e}") log_to_dead_letter(record, str(e)) except Exception as e: # Unknown error — raise to trigger retry print(f"Unexpected error: {e}") raiseFor SQS-triggered functions, return successfully processed message IDs so Lambda does not retry them alongside failed ones:
def lambda_handler(event, context): batch_item_failures = []
for record in event['Records']: try: process(record) except Exception as e: batch_item_failures.append( {"itemIdentifier": record["messageId"]} )
return {"batchItemFailures": batch_item_failures}This requires the SQS trigger to have Report Batch Item Failures enabled.