AWS Lambda and the OWASP Serverless Top 10: A Security Deep Dive
Comprehensive guide to securing AWS Lambda functions against the OWASP Serverless Top 10 vulnerabilities with practical code examples and mitigation strategies.
AWS Lambda and the OWASP Serverless Top 10: A Security Deep Dive
Serverless computing has revolutionized how we build and deploy applications, but it has also introduced new security challenges that traditional security frameworks don’t address. The OWASP Serverless Top 10 provides a comprehensive framework for understanding and mitigating serverless-specific security risks.
This deep dive explores each vulnerability through the lens of AWS Lambda, providing practical examples and robust mitigation strategies.
SLS01: Function Event-Data Injection
Event-data injection occurs when untrusted data from event sources is processed without proper validation, leading to code injection, command execution, or data corruption.
Vulnerable Lambda Function
import json
import subprocess
import boto3
import os
def lambda_handler(event, context):
"""VULNERABLE: Direct processing of event data without validation"""
# SLS01 Vulnerability #1: Command Injection via event data
username = event.get('username', '')
# Dangerous: Direct shell execution with user input
result = subprocess.run(f"echo 'Processing user: {username}'",
shell=True, capture_output=True, text=True)
# SLS01 Vulnerability #2: SQL Injection via event data
db_query = event.get('query', '')
# Dangerous: Direct SQL execution
connection = get_db_connection()
cursor = connection.cursor()
cursor.execute(f"SELECT * FROM users WHERE name = '{db_query}'") # SQL Injection!
# SLS01 Vulnerability #3: NoSQL Injection via MongoDB
mongo_filter = event.get('filter', {})
# Dangerous: Direct MongoDB query with user input
db = get_mongo_db()
results = db.users.find(mongo_filter) # NoSQL Injection!
# SLS01 Vulnerability #4: Server-Side Template Injection
template_data = event.get('template', '')
# Dangerous: Direct template rendering
from jinja2 import Template
template = Template(template_data) # SSTI vulnerability!
rendered = template.render(user=username)
return {
'statusCode': 200,
'body': json.dumps({
'message': rendered,
'query_results': list(results)
})
}
# Attack payloads that would exploit these vulnerabilities:
malicious_event_1 = {
'username': '; rm -rf / #', # Command injection
}
malicious_event_2 = {
'query': "' OR 1=1 --", # SQL injection
}
malicious_event_3 = {
'filter': {'$where': 'function() { return true; }'}, # NoSQL injection
}
malicious_event_4 = {
'template': '{{ "".class.mro()[2].subclasses()[40]("/etc/passwd").read() }}', # SSTI
}
Secure Implementation
import json
import re
import boto3
from botocore.exceptions import ClientError
import logging
from typing import Dict, Any, Optional
import sqlparse
from cerberus import Validator
# Configure logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
class SecureLambdaHandler:
"""Secure Lambda handler with comprehensive input validation"""
def __init__(self):
# Define strict validation schemas
self.event_schemas = {
'user_request': {
'username': {
'type': 'string',
'regex': '^[a-zA-Z0-9_-]{3,30}$',
'required': True
},
'email': {
'type': 'string',
'regex': '^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$',
'required': False
},
'query_type': {
'type': 'string',
'allowed': ['user_lookup', 'profile_update', 'data_export'],
'required': True
}
}
}
self.validator = Validator()
self.sanitizer = DataSanitizer()
def validate_event_data(self, event: Dict[str, Any], schema_name: str) -> Dict[str, Any]:
"""Validate and sanitize event data"""
if schema_name not in self.event_schemas:
raise ValueError(f"Unknown schema: {schema_name}")
schema = self.event_schemas[schema_name]
# Validate against schema
if not self.validator.validate(event, schema):
logger.error(f"Event validation failed: {self.validator.errors}")
raise ValueError(f"Invalid event data: {self.validator.errors}")
# Additional sanitization
sanitized_event = self.sanitizer.sanitize_event(event)
return sanitized_event
def secure_database_query(self, query_params: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""Secure database query using parameterized queries"""
try:
# Use parameterized queries - NEVER string concatenation
connection = self.get_secure_db_connection()
with connection.cursor() as cursor:
# Parameterized query prevents SQL injection
sql = "SELECT id, username, email FROM users WHERE username = %s AND status = %s"
cursor.execute(sql, (query_params['username'], 'active'))
result = cursor.fetchone()
if result:
return {
'id': result[0],
'username': result[1],
'email': result[2]
}
except Exception as e:
logger.error(f"Database query error: {str(e)}")
raise
return None
def secure_nosql_query(self, filter_params: Dict[str, Any]) -> list:
"""Secure NoSQL query with proper sanitization"""
# Whitelist allowed filter operations
allowed_operations = ['$eq', '$ne', '$in', '$nin']
# Build secure filter
secure_filter = {}
if 'username' in filter_params:
# Only allow exact matches for usernames
secure_filter['username'] = {'$eq': filter_params['username']}
if 'status' in filter_params and filter_params['status'] in ['active', 'inactive']:
secure_filter['status'] = {'$eq': filter_params['status']}
# Execute safe query
try:
db = self.get_mongo_db()
results = list(db.users.find(secure_filter).limit(100)) # Limit results
# Remove sensitive fields
sanitized_results = []
for result in results:
sanitized_results.append({
'id': str(result['_id']),
'username': result['username'],
'created_at': result['created_at']
})
return sanitized_results
except Exception as e:
logger.error(f"NoSQL query error: {str(e)}")
raise
def lambda_handler(event, context):
"""Secure Lambda handler"""
handler = SecureLambdaHandler()
try:
# Validate and sanitize input
validated_event = handler.validate_event_data(event, 'user_request')
# Process based on query type
query_type = validated_event['query_type']
if query_type == 'user_lookup':
result = handler.secure_database_query(validated_event)
elif query_type == 'profile_update':
result = handler.update_user_profile(validated_event)
else:
raise ValueError(f"Unsupported query type: {query_type}")
return {
'statusCode': 200,
'headers': {
'Content-Type': 'application/json',
'X-Content-Type-Options': 'nosniff',
'X-Frame-Options': 'DENY',
'X-XSS-Protection': '1; mode=block'
},
'body': json.dumps({
'success': True,
'data': result
})
}
except ValueError as e:
logger.error(f"Validation error: {str(e)}")
return {
'statusCode': 400,
'body': json.dumps({'error': 'Invalid request data'})
}
except Exception as e:
logger.error(f"Unexpected error: {str(e)}")
return {
'statusCode': 500,
'body': json.dumps({'error': 'Internal server error'})
}
class DataSanitizer:
"""Comprehensive data sanitization utilities"""
def __init__(self):
# Dangerous patterns to remove
self.dangerous_patterns = [
r'<script.*?</script>',
r'javascript:',
r'vbscript:',
r'on\w+\s*=',
r'eval\s*\(',
r'exec\s*\(',
r'\$\{.*?\}', # Template injection patterns
r'\{\{.*?\}\}',
r'<%.*?%>',
]
def sanitize_string(self, value: str) -> str:
"""Sanitize string input"""
if not isinstance(value, str):
return str(value)
# Remove dangerous patterns
for pattern in self.dangerous_patterns:
value = re.sub(pattern, '', value, flags=re.IGNORECASE)
# HTML encode special characters
value = value.replace('&', '&')
value = value.replace('<', '<')
value = value.replace('>', '>')
value = value.replace('"', '"')
value = value.replace("'", ''')
return value.strip()
def sanitize_event(self, event: Dict[str, Any]) -> Dict[str, Any]:
"""Recursively sanitize event data"""
sanitized = {}
for key, value in event.items():
if isinstance(value, str):
sanitized[key] = self.sanitize_string(value)
elif isinstance(value, dict):
sanitized[key] = self.sanitize_event(value)
elif isinstance(value, list):
sanitized[key] = [self.sanitize_string(item) if isinstance(item, str) else item for item in value]
else:
sanitized[key] = value
return sanitized
SLS02: Broken Authentication
Serverless functions often implement custom authentication mechanisms that can be easily bypassed or exploited.
Vulnerable JWT Implementation
import jwt
import json
from datetime import datetime, timedelta
def lambda_handler(event, context):
"""VULNERABLE: Weak JWT implementation"""
# SLS02 Vulnerability #1: Weak secret key
SECRET_KEY = "secret123" # Weak, predictable key
# SLS02 Vulnerability #2: No algorithm verification
token = event['headers'].get('Authorization', '').replace('Bearer ', '')
try:
# Dangerous: Doesn't specify algorithm, vulnerable to none algorithm attack
decoded = jwt.decode(token, SECRET_KEY, options={"verify_signature": False})
# SLS02 Vulnerability #3: No token expiration validation
# Missing: exp claim validation
# SLS02 Vulnerability #4: No issuer/audience validation
user_id = decoded.get('user_id')
role = decoded.get('role', 'user') # Default to user role
# SLS02 Vulnerability #5: Privilege escalation via role manipulation
if role == 'admin':
return admin_function(user_id)
else:
return user_function(user_id)
except jwt.InvalidTokenError:
return {
'statusCode': 401,
'body': json.dumps({'error': 'Invalid token'})
}
# Attack scenarios:
# 1. None algorithm attack: JWT with "alg": "none"
# 2. Weak key brute force
# 3. Token without expiration
# 4. Role escalation by modifying JWT claims
Secure JWT Implementation
import jwt
import json
import os
import boto3
from datetime import datetime, timedelta
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.backends import default_backend
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
class SecureJWTValidator:
"""Secure JWT validation with comprehensive security controls"""
def __init__(self):
# Use AWS Systems Manager for secure key storage
self.ssm_client = boto3.client('ssm')
self.secrets_client = boto3.client('secretsmanager')
# Supported algorithms (explicitly no 'none')
self.allowed_algorithms = ['RS256', 'ES256', 'HS256']
# JWT validation options
self.jwt_options = {
'verify_signature': True,
'verify_exp': True,
'verify_nbf': True,
'verify_iat': True,
'verify_aud': True,
'verify_iss': True,
'require_exp': True,
'require_iat': True,
'require_nbf': False
}
# Token constraints
self.max_token_age = timedelta(hours=1)
self.allowed_issuers = [
'https://auth.yourcompany.com',
'https://identity.yourcompany.com'
]
self.allowed_audiences = [
'lambda-api',
'serverless-backend'
]
def get_signing_key(self) -> str:
"""Retrieve signing key from AWS Secrets Manager"""
try:
response = self.secrets_client.get_secret_value(
SecretId=os.environ['JWT_SECRET_ARN']
)
secret = json.loads(response['SecretString'])
return secret['signing_key']
except Exception as e:
logger.error(f"Failed to retrieve signing key: {str(e)}")
raise ValueError("Authentication service unavailable")
def validate_token_structure(self, token: str) -> Dict[str, Any]:
"""Validate JWT structure before decoding"""
# Check token format
parts = token.split('.')
if len(parts) != 3:
raise ValueError("Invalid token format")
# Decode header to check algorithm
try:
import base64
header = json.loads(base64.urlsafe_b64decode(parts[0] + '=='))
except:
raise ValueError("Invalid token header")
# Verify algorithm is allowed
algorithm = header.get('alg')
if algorithm not in self.allowed_algorithms:
raise ValueError(f"Algorithm {algorithm} not allowed")
# Prevent none algorithm attack
if algorithm.lower() == 'none':
raise ValueError("None algorithm not permitted")
return header
def validate_jwt_token(self, token: str) -> Dict[str, Any]:
"""Comprehensive JWT validation"""
if not token:
raise ValueError("No token provided")
# Remove Bearer prefix if present
if token.startswith('Bearer '):
token = token[7:]
# Validate token structure
header = self.validate_token_structure(token)
# Get signing key
signing_key = self.get_signing_key()
try:
# Decode and validate token
decoded_token = jwt.decode(
token,
signing_key,
algorithms=self.allowed_algorithms,
options=self.jwt_options,
audience=self.allowed_audiences,
issuer=self.allowed_issuers,
leeway=timedelta(seconds=10) # Allow 10 seconds clock skew
)
# Additional custom validations
self.validate_custom_claims(decoded_token)
return decoded_token
except jwt.ExpiredSignatureError:
raise ValueError("Token has expired")
except jwt.InvalidAudienceError:
raise ValueError("Invalid audience")
except jwt.InvalidIssuerError:
raise ValueError("Invalid issuer")
except jwt.InvalidSignatureError:
raise ValueError("Invalid signature")
except jwt.InvalidTokenError as e:
raise ValueError(f"Invalid token: {str(e)}")
def validate_custom_claims(self, claims: Dict[str, Any]):
"""Validate custom claims and business logic"""
# Validate required claims
required_claims = ['user_id', 'role', 'permissions', 'session_id']
for claim in required_claims:
if claim not in claims:
raise ValueError(f"Missing required claim: {claim}")
# Validate user_id format
user_id = claims['user_id']
if not isinstance(user_id, str) or not user_id.isalnum():
raise ValueError("Invalid user_id format")
# Validate role
allowed_roles = ['user', 'moderator', 'admin']
if claims['role'] not in allowed_roles:
raise ValueError(f"Invalid role: {claims['role']}")
# Validate permissions structure
permissions = claims['permissions']
if not isinstance(permissions, list):
raise ValueError("Permissions must be a list")
# Validate session (prevent token reuse after logout)
session_id = claims['session_id']
if not self.validate_active_session(user_id, session_id):
raise ValueError("Session no longer active")
# Check for privileged operations
if claims['role'] == 'admin':
# Additional validation for admin tokens
if not self.validate_admin_token(claims):
raise ValueError("Invalid admin token")
def validate_active_session(self, user_id: str, session_id: str) -> bool:
"""Check if session is still active (not logged out)"""
try:
# Check against active sessions in DynamoDB
dynamodb = boto3.resource('dynamodb')
sessions_table = dynamodb.Table(os.environ['SESSIONS_TABLE'])
response = sessions_table.get_item(
Key={
'user_id': user_id,
'session_id': session_id
}
)
item = response.get('Item')
if not item:
return False
# Check if session is expired
expiry = datetime.fromisoformat(item['expires_at'])
if datetime.utcnow() > expiry:
return False
# Check if session is revoked
return item.get('status') == 'active'
except Exception as e:
logger.error(f"Session validation error: {str(e)}")
return False
def validate_admin_token(self, claims: Dict[str, Any]) -> bool:
"""Additional validation for admin tokens"""
# Admin tokens must have shorter expiration
issued_at = datetime.fromtimestamp(claims['iat'])
if datetime.utcnow() - issued_at > timedelta(minutes=30):
return False
# Admin tokens must include specific claims
admin_claims = ['admin_level', 'approved_by', 'approval_timestamp']
for claim in admin_claims:
if claim not in claims:
return False
# Validate admin level
admin_level = claims['admin_level']
if admin_level not in ['level1', 'level2', 'level3']:
return False
return True
def lambda_handler(event, context):
"""Secure Lambda handler with robust authentication"""
jwt_validator = SecureJWTValidator()
try:
# Extract token from Authorization header
auth_header = event.get('headers', {}).get('Authorization', '')
if not auth_header.startswith('Bearer '):
return {
'statusCode': 401,
'headers': {
'WWW-Authenticate': 'Bearer realm="Lambda API"'
},
'body': json.dumps({'error': 'Bearer token required'})
}
# Validate JWT token
claims = jwt_validator.validate_jwt_token(auth_header)
# Extract user context
user_context = {
'user_id': claims['user_id'],
'role': claims['role'],
'permissions': claims['permissions'],
'session_id': claims['session_id']
}
# Check permissions for requested operation
requested_operation = event.get('pathParameters', {}).get('operation')
if not has_permission(user_context, requested_operation):
return {
'statusCode': 403,
'body': json.dumps({'error': 'Insufficient permissions'})
}
# Process authenticated request
result = process_authenticated_request(event, user_context)
return {
'statusCode': 200,
'headers': {
'Content-Type': 'application/json',
'Cache-Control': 'no-cache, no-store, must-revalidate',
'Pragma': 'no-cache',
'Expires': '0'
},
'body': json.dumps(result)
}
except ValueError as e:
logger.warning(f"Authentication error: {str(e)}")
return {
'statusCode': 401,
'body': json.dumps({'error': 'Authentication failed'})
}
except Exception as e:
logger.error(f"Unexpected error: {str(e)}")
return {
'statusCode': 500,
'body': json.dumps({'error': 'Internal server error'})
}
def has_permission(user_context: Dict[str, Any], operation: str) -> bool:
"""Check if user has permission for specific operation"""
# Define permission matrix
permission_matrix = {
'read_profile': ['user', 'moderator', 'admin'],
'update_profile': ['user', 'moderator', 'admin'],
'delete_user': ['admin'],
'manage_roles': ['admin'],
'view_analytics': ['moderator', 'admin']
}
required_roles = permission_matrix.get(operation, [])
user_role = user_context['role']
# Check role-based permissions
if user_role not in required_roles:
return False
# Check fine-grained permissions
user_permissions = user_context['permissions']
operation_permission = f"operation:{operation}"
return operation_permission in user_permissions
SLS03: Insecure Serverless Deployment Configuration
Misconfigured serverless deployments can expose sensitive data and create attack vectors.
Vulnerable Serverless Configuration
# serverless.yml - VULNERABLE configuration
service: vulnerable-lambda-api
provider:
name: aws
runtime: python3.9
stage: prod
region: us-east-1
# SLS03 Vulnerability #1: Overly permissive IAM permissions
iamRoleStatements:
- Effect: Allow
Action: "*" # DANGEROUS: Full AWS access!
Resource: "*"
# SLS03 Vulnerability #2: No environment variable encryption
environment:
DATABASE_PASSWORD: "prod_password_123" # Plaintext secret!
API_KEY: "sk-1234567890abcdef" # Exposed API key!
JWT_SECRET: "supersecret" # Weak secret in plaintext!
functions:
api:
handler: handler.lambda_handler
# SLS03 Vulnerability #3: No timeout limits
timeout: 900 # 15 minutes - way too long!
# SLS03 Vulnerability #4: Excessive memory allocation
memorySize: 3008 # Maximum memory when function might need 128MB
# SLS03 Vulnerability #5: No reserved concurrency
# Missing: reservedConcurrency setting
events:
- http:
path: /{proxy+}
method: ANY
# SLS03 Vulnerability #6: No API Gateway configuration
# Missing: request validation, throttling, authentication
# SLS03 Vulnerability #7: No VPC configuration
# Function has full internet access without restrictions
# SLS03 Vulnerability #8: No dead letter queue configuration
# Missing: onError/deadLetterQueue configuration
resources:
Resources:
# SLS03 Vulnerability #9: S3 bucket with public access
DataBucket:
Type: AWS::S3::Bucket
Properties:
PublicAccessBlockConfiguration:
BlockPublicAcls: false # DANGEROUS!
BlockPublicPolicy: false # DANGEROUS!
IgnorePublicAcls: false # DANGEROUS!
RestrictPublicBuckets: false # DANGEROUS!
# SLS03 Vulnerability #10: DynamoDB without encryption
UsersTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: users
# Missing: Server-side encryption configuration
# Missing: Point-in-time recovery
# Missing: Backup configuration
Secure Serverless Configuration
# serverless.yml - SECURE configuration
service: secure-lambda-api
frameworkVersion: "3"
provider:
name: aws
runtime: python3.9
stage: ${opt:stage, 'dev'}
region: ${opt:region, 'us-east-1'}
# Secure IAM role with least privilege principle
iamRoleStatements:
# DynamoDB permissions - specific table only
- Effect: Allow
Action:
- dynamodb:GetItem
- dynamodb:PutItem
- dynamodb:UpdateItem
- dynamodb:DeleteItem
- dynamodb:Query
- dynamodb:Scan
Resource:
- !GetAtt UsersTable.Arn
- !Sub "${UsersTable.Arn}/index/*"
# S3 permissions - specific bucket and operations only
- Effect: Allow
Action:
- s3:GetObject
- s3:PutObject
Resource: !Sub "${DataBucket}/*"
# Secrets Manager permissions - specific secrets only
- Effect: Allow
Action:
- secretsmanager:GetSecretValue
Resource:
- !Ref DatabaseSecret
- !Ref JWTSecret
- !Ref APIKeySecret
# KMS permissions for encryption/decryption
- Effect: Allow
Action:
- kms:Decrypt
- kms:GenerateDataKey
Resource: !GetAtt LambdaKMSKey.Arn
# CloudWatch Logs permissions
- Effect: Allow
Action:
- logs:CreateLogGroup
- logs:CreateLogStream
- logs:PutLogEvents
Resource: !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:*"
# Secure environment variables using AWS Secrets Manager
environment:
STAGE: ${self:provider.stage}
REGION: ${self:provider.region}
USERS_TABLE: !Ref UsersTable
DATA_BUCKET: !Ref DataBucket
# Secrets stored in AWS Secrets Manager, not as plain text
DATABASE_SECRET_ARN: !Ref DatabaseSecret
JWT_SECRET_ARN: !Ref JWTSecret
API_KEY_SECRET_ARN: !Ref APIKeySecret
KMS_KEY_ID: !GetAtt LambdaKMSKey.Arn
# Global function configuration
vpc:
securityGroupIds:
- !Ref LambdaSecurityGroup
subnetIds:
- !Ref PrivateSubnet1
- !Ref PrivateSubnet2
# Enable tracing
tracing:
lambda: true
apiGateway: true
functions:
api:
handler: handler.lambda_handler
# Security configurations
timeout: 30 # Short timeout
memorySize: 256 # Appropriate memory allocation
reservedConcurrency: 100 # Prevent resource exhaustion
# Environment-specific configuration
environment:
LOG_LEVEL: ${param:LOG_LEVEL, 'INFO'}
# Dead letter queue for error handling
deadLetter:
targetArn: !GetAtt ErrorQueue.Arn
# Events with security configurations
events:
- http:
path: /{proxy+}
method: ANY
# API Gateway security
request:
parameters:
headers:
Authorization: true # Require Authorization header
# Request validation
requestValidatorName: RequestValidator
# CORS configuration
cors:
origin: 'https://yourapp.com'
headers:
- Content-Type
- X-Amz-Date
- Authorization
- X-Api-Key
- X-Amz-Security-Token
allowCredentials: true
# Health check function with minimal permissions
health:
handler: health.handler
timeout: 10
memorySize: 128
iamRoleStatements: [] # No additional permissions needed
events:
- http:
path: /health
method: GET
# Security-focused custom resources
resources:
Resources:
# KMS key for encryption
LambdaKMSKey:
Type: AWS::KMS::Key
Properties:
Description: "KMS Key for Lambda encryption"
KeyPolicy:
Statement:
- Sid: Enable IAM User Permissions
Effect: Allow
Principal:
AWS: !Sub "arn:aws:iam::${AWS::AccountId}:root"
Action: "kms:*"
Resource: "*"
- Sid: Allow Lambda service
Effect: Allow
Principal:
Service: lambda.amazonaws.com
Action:
- kms:Decrypt
- kms:GenerateDataKey
Resource: "*"
# KMS Key Alias
LambdaKMSKeyAlias:
Type: AWS::KMS::Alias
Properties:
AliasName: alias/lambda-${self:provider.stage}
TargetKeyId: !Ref LambdaKMSKey
# Secure S3 bucket
DataBucket:
Type: AWS::S3::Bucket
Properties:
BucketName: !Sub "secure-lambda-data-${self:provider.stage}-${AWS::AccountId}"
# Enable versioning
VersioningConfiguration:
Status: Enabled
# Server-side encryption
BucketEncryption:
ServerSideEncryptionConfiguration:
- ServerSideEncryptionByDefault:
SSEAlgorithm: aws:kms
KMSMasterKeyID: !GetAtt LambdaKMSKey.Arn
BucketKeyEnabled: true
# Block all public access
PublicAccessBlockConfiguration:
BlockPublicAcls: true
BlockPublicPolicy: true
IgnorePublicAcls: true
RestrictPublicBuckets: true
# Lifecycle policy
LifecycleConfiguration:
Rules:
- Id: DeleteOldVersions
Status: Enabled
NoncurrentVersionExpirationInDays: 30
# Logging configuration
LoggingConfiguration:
DestinationBucketName: !Ref LoggingBucket
LogFilePrefix: access-logs/
# Notification configuration for security monitoring
NotificationConfiguration:
CloudWatchConfigurations:
- Event: s3:ObjectCreated:*
CloudWatchConfiguration:
LogGroupName: !Ref S3AccessLogGroup
# Secure DynamoDB table
UsersTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: !Sub "users-${self:provider.stage}"
# Billing mode and capacity
BillingMode: PAY_PER_REQUEST
# Attribute definitions
AttributeDefinitions:
- AttributeName: user_id
AttributeType: S
- AttributeName: email
AttributeType: S
# Key schema
KeySchema:
- AttributeName: user_id
KeyType: HASH
# Global Secondary Indexes
GlobalSecondaryIndexes:
- IndexName: EmailIndex
KeySchema:
- AttributeName: email
KeyType: HASH
Projection:
ProjectionType: ALL
# Encryption at rest
SSESpecification:
SSEEnabled: true
KMSMasterKeyId: !GetAtt LambdaKMSKey.Arn
# Point-in-time recovery
PointInTimeRecoverySpecification:
PointInTimeRecoveryEnabled: true
# Backup policy
BackupPolicy:
PointInTimeRecoveryEnabled: true
# Stream specification for audit logging
StreamSpecification:
StreamViewType: NEW_AND_OLD_IMAGES
# Tags
Tags:
- Key: Environment
Value: ${self:provider.stage}
- Key: Application
Value: secure-lambda-api
# Secrets in AWS Secrets Manager
DatabaseSecret:
Type: AWS::SecretsManager::Secret
Properties:
Name: !Sub "lambda-db-secret-${self:provider.stage}"
Description: "Database credentials for Lambda"
KmsKeyId: !Ref LambdaKMSKey
SecretString: !Sub |
{
"username": "lambda_user",
"password": "${param:DB_PASSWORD}",
"host": "${param:DB_HOST}",
"port": 5432,
"database": "production"
}
JWTSecret:
Type: AWS::SecretsManager::Secret
Properties:
Name: !Sub "lambda-jwt-secret-${self:provider.stage}"
Description: "JWT signing key for Lambda"
KmsKeyId: !Ref LambdaKMSKey
GenerateSecretString:
SecretStringTemplate: '{}'
GenerateStringKey: 'signing_key'
PasswordLength: 64
ExcludeCharacters: '"@/\'
APIKeySecret:
Type: AWS::SecretsManager::Secret
Properties:
Name: !Sub "lambda-api-key-${self:provider.stage}"
Description: "External API keys for Lambda"
KmsKeyId: !Ref LambdaKMSKey
SecretString: !Sub |
{
"openai_api_key": "${param:OPENAI_API_KEY}",
"stripe_api_key": "${param:STRIPE_API_KEY}"
}
# VPC Configuration
VPC:
Type: AWS::EC2::VPC
Properties:
CidrBlock: 10.0.0.0/16
EnableDnsHostnames: true
EnableDnsSupport: true
Tags:
- Key: Name
Value: !Sub "lambda-vpc-${self:provider.stage}"
# Private subnets for Lambda functions
PrivateSubnet1:
Type: AWS::EC2::Subnet
Properties:
VpcId: !Ref VPC
CidrBlock: 10.0.1.0/24
AvailabilityZone: !Select [0, !GetAZs '']
Tags:
- Key: Name
Value: !Sub "lambda-private-subnet-1-${self:provider.stage}"
PrivateSubnet2:
Type: AWS::EC2::Subnet
Properties:
VpcId: !Ref VPC
CidrBlock: 10.0.2.0/24
AvailabilityZone: !Select [1, !GetAZs '']
Tags:
- Key: Name
Value: !Sub "lambda-private-subnet-2-${self:provider.stage}"
# Security group for Lambda functions
LambdaSecurityGroup:
Type: AWS::EC2::SecurityGroup
Properties:
GroupDescription: Security group for Lambda functions
VpcId: !Ref VPC
# Egress rules - restrictive outbound access
SecurityGroupEgress:
# HTTPS to AWS services
- IpProtocol: tcp
FromPort: 443
ToPort: 443
CidrIp: 0.0.0.0/0
# Database access
- IpProtocol: tcp
FromPort: 5432
ToPort: 5432
SourceSecurityGroupId: !Ref DatabaseSecurityGroup
Tags:
- Key: Name
Value: !Sub "lambda-sg-${self:provider.stage}"
# API Gateway Request Validator
RequestValidator:
Type: AWS::ApiGateway::RequestValidator
Properties:
Name: RequestValidator
RestApiId: !Ref ApiGatewayRestApi
ValidateRequestBody: true
ValidateRequestParameters: true
# Dead Letter Queue
ErrorQueue:
Type: AWS::SQS::Queue
Properties:
QueueName: !Sub "lambda-errors-${self:provider.stage}"
KmsMasterKeyId: !GetAtt LambdaKMSKey.Arn
MessageRetentionPeriod: 1209600 # 14 days
VisibilityTimeoutSeconds: 300
# CloudWatch Log Groups with encryption
LambdaLogGroup:
Type: AWS::Logs::LogGroup
Properties:
LogGroupName: !Sub "/aws/lambda/secure-lambda-api-${self:provider.stage}-api"
RetentionInDays: 30
KmsKeyId: !GetAtt LambdaKMSKey.Arn
# Parameters for sensitive values (passed via CLI or CI/CD)
params:
default:
LOG_LEVEL: INFO
prod:
LOG_LEVEL: WARN
DB_PASSWORD: ${ssm:/lambda/prod/db_password~true}
DB_HOST: ${ssm:/lambda/prod/db_host}
OPENAI_API_KEY: ${ssm:/lambda/prod/openai_key~true}
STRIPE_API_KEY: ${ssm:/lambda/prod/stripe_key~true}
# Plugins for additional security
plugins:
- serverless-plugin-tracing
- serverless-plugin-aws-alerts
- serverless-iam-roles-per-function
- serverless-plugin-resource-tagging
SLS04: Over-Privileged Function Permissions & SLS05: Inadequate Function Monitoring and Logging
These vulnerabilities often go hand-in-hand, so I’ll address them together with a comprehensive monitoring and least-privilege implementation:
Comprehensive Security Monitoring System
# secure_lambda_with_monitoring.py
import json
import boto3
import logging
import time
from datetime import datetime, timezone
from typing import Dict, Any, Optional
import hashlib
import os
# Configure structured logging
class SecurityLogger:
"""Enhanced security-focused logging for Lambda functions"""
def __init__(self):
self.logger = logging.getLogger()
self.logger.setLevel(os.environ.get('LOG_LEVEL', 'INFO'))
# Create CloudWatch client for custom metrics
self.cloudwatch = boto3.client('cloudwatch')
# Security event types
self.security_events = {
'AUTHENTICATION_FAILURE': 'security.auth.failure',
'AUTHORIZATION_FAILURE': 'security.authz.failure',
'SUSPICIOUS_INPUT': 'security.input.suspicious',
'RATE_LIMIT_EXCEEDED': 'security.rate_limit.exceeded',
'PRIVILEGE_ESCALATION': 'security.privilege.escalation',
'DATA_ACCESS_VIOLATION': 'security.data.violation',
'ANOMALOUS_BEHAVIOR': 'security.behavior.anomalous'
}
def log_security_event(self, event_type: str, details: Dict[str, Any],
context: Dict[str, Any], severity: str = 'WARNING'):
"""Log security events with structured format"""
# Create security log entry
security_log = {
'timestamp': datetime.now(timezone.utc).isoformat(),
'event_type': event_type,
'severity': severity,
'lambda_function': context.get('function_name', 'unknown'),
'request_id': context.get('aws_request_id', 'unknown'),
'details': details,
'metadata': {
'region': os.environ.get('AWS_REGION'),
'stage': os.environ.get('STAGE'),
'version': context.get('function_version', 'unknown')
}
}
# Log to CloudWatch
if severity in ['ERROR', 'CRITICAL']:
self.logger.error(json.dumps(security_log))
elif severity == 'WARNING':
self.logger.warning(json.dumps(security_log))
else:
self.logger.info(json.dumps(security_log))
# Send custom metric to CloudWatch
self.send_security_metric(event_type, severity)
# Send to security event queue for real-time alerting
if severity in ['ERROR', 'CRITICAL']:
self.send_to_security_queue(security_log)
def send_security_metric(self, event_type: str, severity: str):
"""Send custom security metrics to CloudWatch"""
try:
self.cloudwatch.put_metric_data(
Namespace='Lambda/Security',
MetricData=[
{
'MetricName': 'SecurityEvents',
'Dimensions': [
{
'Name': 'EventType',
'Value': event_type
},
{
'Name': 'Severity',
'Value': severity
},
{
'Name': 'FunctionName',
'Value': os.environ.get('AWS_LAMBDA_FUNCTION_NAME', 'unknown')
}
],
'Value': 1,
'Unit': 'Count',
'Timestamp': datetime.now(timezone.utc)
}
]
)
except Exception as e:
self.logger.error(f"Failed to send security metric: {str(e)}")
def send_to_security_queue(self, security_log: Dict[str, Any]):
"""Send critical security events to SQS for real-time processing"""
try:
sqs = boto3.client('sqs')
queue_url = os.environ.get('SECURITY_ALERTS_QUEUE')
if queue_url:
sqs.send_message(
QueueUrl=queue_url,
MessageBody=json.dumps(security_log),
MessageAttributes={
'EventType': {
'StringValue': security_log['event_type'],
'DataType': 'String'
},
'Severity': {
'StringValue': security_log['severity'],
'DataType': 'String'
}
}
)
except Exception as e:
self.logger.error(f"Failed to send to security queue: {str(e)}")
class PrivilegeManager:
"""Implement fine-grained privilege management"""
def __init__(self):
self.security_logger = SecurityLogger()
# Define operation-specific permissions
self.operation_permissions = {
'read_user_profile': {
'required_role': 'user',
'aws_permissions': ['dynamodb:GetItem'],
'resource_pattern': 'users-table',
'data_classification': 'internal'
},
'update_user_profile': {
'required_role': 'user',
'aws_permissions': ['dynamodb:UpdateItem'],
'resource_pattern': 'users-table',
'data_classification': 'internal',
'owner_only': True
},
'delete_user': {
'required_role': 'admin',
'aws_permissions': ['dynamodb:DeleteItem'],
'resource_pattern': 'users-table',
'data_classification': 'confidential'
},
'export_user_data': {
'required_role': 'admin',
'aws_permissions': ['dynamodb:Scan', 's3:PutObject'],
'resource_pattern': ['users-table', 'export-bucket'],
'data_classification': 'confidential',
'audit_required': True
}
}
def check_operation_permissions(self, operation: str, user_context: Dict[str, Any],
resource_context: Dict[str, Any]) -> Dict[str, Any]:
"""Check if user has permissions for specific operation"""
permission_check = {
'allowed': False,
'reason': '',
'audit_required': False,
'restrictions': []
}
if operation not in self.operation_permissions:
permission_check['reason'] = f"Unknown operation: {operation}"
self.security_logger.log_security_event(
'AUTHORIZATION_FAILURE',
{'operation': operation, 'reason': 'unknown_operation'},
user_context,
'ERROR'
)
return permission_check
op_config = self.operation_permissions[operation]
# Check role requirements
required_role = op_config['required_role']
user_role = user_context.get('role', 'guest')
role_hierarchy = {'guest': 0, 'user': 1, 'moderator': 2, 'admin': 3}
if role_hierarchy.get(user_role, 0) < role_hierarchy.get(required_role, 999):
permission_check['reason'] = f"Insufficient role: {user_role} < {required_role}"
self.security_logger.log_security_event(
'AUTHORIZATION_FAILURE',
{
'operation': operation,
'user_role': user_role,
'required_role': required_role
},
user_context,
'WARNING'
)
return permission_check
# Check owner-only restrictions
if op_config.get('owner_only', False):
user_id = user_context.get('user_id')
resource_owner = resource_context.get('owner_id')
if user_id != resource_owner and user_role != 'admin':
permission_check['reason'] = "Owner-only operation"
self.security_logger.log_security_event(
'AUTHORIZATION_FAILURE',
{
'operation': operation,
'user_id': user_id,
'resource_owner': resource_owner
},
user_context,
'WARNING'
)
return permission_check
# Check data classification restrictions
data_classification = op_config.get('data_classification', 'public')
user_clearance = user_context.get('data_clearance', 'public')
clearance_levels = {'public': 0, 'internal': 1, 'confidential': 2, 'secret': 3}
if clearance_levels.get(user_clearance, 0) < clearance_levels.get(data_classification, 0):
permission_check['reason'] = f"Insufficient clearance: {user_clearance} < {data_classification}"
self.security_logger.log_security_event(
'AUTHORIZATION_FAILURE',
{
'operation': operation,
'user_clearance': user_clearance,
'required_clearance': data_classification
},
user_context,
'ERROR'
)
return permission_check
# Set audit requirements
permission_check['audit_required'] = op_config.get('audit_required', False)
# Operation is allowed
permission_check['allowed'] = True
permission_check['reason'] = 'Permission granted'
return permission_check
class SecurityAuditLogger:
"""Comprehensive audit logging for compliance"""
def __init__(self):
self.dynamodb = boto3.resource('dynamodb')
self.audit_table = self.dynamodb.Table(os.environ.get('AUDIT_TABLE', 'security-audit'))
self.s3_client = boto3.client('s3')
self.audit_bucket = os.environ.get('AUDIT_BUCKET')
def log_audit_event(self, event_type: str, user_context: Dict[str, Any],
operation_details: Dict[str, Any], lambda_context: Dict[str, Any]):
"""Log audit events for compliance and forensics"""
# Create audit record
audit_record = {
'audit_id': self.generate_audit_id(user_context, lambda_context),
'timestamp': datetime.now(timezone.utc).isoformat(),
'event_type': event_type,
'user_id': user_context.get('user_id', 'anonymous'),
'user_role': user_context.get('role', 'unknown'),
'session_id': user_context.get('session_id', 'unknown'),
'function_name': lambda_context.get('function_name'),
'request_id': lambda_context.get('aws_request_id'),
'operation_details': operation_details,
'source_ip': operation_details.get('source_ip', 'unknown'),
'user_agent': operation_details.get('user_agent', 'unknown'),
'ttl': int(time.time()) + (365 * 24 * 3600) # Retain for 1 year
}
# Store in DynamoDB
try:
self.audit_table.put_item(Item=audit_record)
except Exception as e:
logging.error(f"Failed to store audit record: {str(e)}")
# Also store in S3 for long-term retention and analysis
if self.audit_bucket:
self.store_audit_in_s3(audit_record)
def generate_audit_id(self, user_context: Dict[str, Any], lambda_context: Dict[str, Any]) -> str:
"""Generate unique audit ID"""
content = f"{user_context.get('user_id')}-{lambda_context.get('aws_request_id')}-{time.time()}"
return hashlib.sha256(content.encode()).hexdigest()[:16]
def store_audit_in_s3(self, audit_record: Dict[str, Any]):
"""Store audit record in S3 for long-term retention"""
try:
# Organize by date for efficient querying
date_prefix = datetime.now(timezone.utc).strftime('%Y/%m/%d')
s3_key = f"audit-logs/{date_prefix}/{audit_record['audit_id']}.json"
self.s3_client.put_object(
Bucket=self.audit_bucket,
Key=s3_key,
Body=json.dumps(audit_record),
ServerSideEncryption='aws:kms',
SSEKMSKeyId=os.environ.get('AUDIT_KMS_KEY'),
ContentType='application/json'
)
except Exception as e:
logging.error(f"Failed to store audit record in S3: {str(e)}")
def lambda_handler(event, context):
"""Secure Lambda handler with comprehensive monitoring and least privilege"""
# Initialize security components
security_logger = SecurityLogger()
privilege_manager = PrivilegeManager()
audit_logger = SecurityAuditLogger()
# Extract request context
request_context = {
'source_ip': event.get('requestContext', {}).get('identity', {}).get('sourceIp', 'unknown'),
'user_agent': event.get('headers', {}).get('User-Agent', 'unknown'),
'method': event.get('httpMethod', 'unknown'),
'path': event.get('path', 'unknown'),
'query_params': event.get('queryStringParameters', {}),
'headers': {k: v for k, v in event.get('headers', {}).items() if k.lower() not in ['authorization', 'cookie']}
}
lambda_context = {
'function_name': context.function_name,
'aws_request_id': context.aws_request_id,
'function_version': context.function_version,
'remaining_time': context.get_remaining_time_in_millis()
}
try:
# Step 1: Authenticate user (extract from JWT token)
auth_result = authenticate_user(event)
if not auth_result['success']:
security_logger.log_security_event(
'AUTHENTICATION_FAILURE',
{
'reason': auth_result['reason'],
'source_ip': request_context['source_ip']
},
lambda_context,
'WARNING'
)
return {
'statusCode': 401,
'body': json.dumps({'error': 'Authentication failed'})
}
user_context = auth_result['user_context']
# Step 2: Determine requested operation
requested_operation = extract_operation(event)
# Step 3: Check permissions
permission_result = privilege_manager.check_operation_permissions(
requested_operation,
user_context,
request_context
)
if not permission_result['allowed']:
security_logger.log_security_event(
'AUTHORIZATION_FAILURE',
{
'operation': requested_operation,
'reason': permission_result['reason'],
'user_id': user_context.get('user_id')
},
lambda_context,
'WARNING'
)
return {
'statusCode': 403,
'body': json.dumps({'error': 'Access denied'})
}
# Step 4: Log audit event if required
if permission_result['audit_required']:
audit_logger.log_audit_event(
'OPERATION_EXECUTED',
user_context,
{
'operation': requested_operation,
**request_context
},
lambda_context
)
# Step 5: Execute operation with monitoring
start_time = time.time()
result = execute_secure_operation(
requested_operation,
event,
user_context,
security_logger,
lambda_context
)
execution_time = time.time() - start_time
# Step 6: Log successful operation
security_logger.log_security_event(
'OPERATION_SUCCESS',
{
'operation': requested_operation,
'user_id': user_context.get('user_id'),
'execution_time': execution_time
},
lambda_context,
'INFO'
)
# Step 7: Return secure response
return {
'statusCode': 200,
'headers': {
'Content-Type': 'application/json',
'X-Content-Type-Options': 'nosniff',
'X-Frame-Options': 'DENY',
'X-XSS-Protection': '1; mode=block',
'Strict-Transport-Security': 'max-age=31536000; includeSubDomains',
'Cache-Control': 'no-cache, no-store, must-revalidate'
},
'body': json.dumps({
'success': True,
'data': result,
'request_id': context.aws_request_id
})
}
except Exception as e:
# Log error with full context for debugging
security_logger.log_security_event(
'OPERATION_ERROR',
{
'error': str(e),
'error_type': type(e).__name__,
'user_id': user_context.get('user_id', 'unknown') if 'user_context' in locals() else 'unknown',
'operation': requested_operation if 'requested_operation' in locals() else 'unknown'
},
lambda_context,
'ERROR'
)
return {
'statusCode': 500,
'headers': {
'Content-Type': 'application/json'
},
'body': json.dumps({
'error': 'Internal server error',
'request_id': context.aws_request_id
})
}
def execute_secure_operation(operation: str, event: Dict[str, Any],
user_context: Dict[str, Any], security_logger: SecurityLogger,
lambda_context: Dict[str, Any]) -> Any:
"""Execute operations with security monitoring"""
# Operation routing with security checks
if operation == 'read_user_profile':
return read_user_profile_secure(event, user_context, security_logger)
elif operation == 'update_user_profile':
return update_user_profile_secure(event, user_context, security_logger)
elif operation == 'delete_user':
return delete_user_secure(event, user_context, security_logger)
else:
raise ValueError(f"Unknown operation: {operation}")
Conclusion: Building Secure Serverless Applications
The OWASP Serverless Top 10 provides a critical framework for understanding and mitigating serverless-specific security risks. As we’ve seen through these examples, securing serverless applications requires:
- Input Validation and Sanitization: Never trust event data without thorough validation
- Robust Authentication and Authorization: Implement proper JWT validation with comprehensive security checks
- Secure Configuration Management: Use AWS Secrets Manager, KMS encryption, and least-privilege IAM policies
- Comprehensive Monitoring: Implement detailed logging, metrics, and real-time alerting
- Defense in Depth: Layer multiple security controls to protect against sophisticated attacks
Key implementation principles:
- Least Privilege: Grant only the minimum permissions required for each function
- Fail Secure: Default to denying access when in doubt
- Comprehensive Auditing: Log all security-relevant events for compliance and forensics
- Real-time Monitoring: Detect and respond to security incidents quickly
- Regular Security Reviews: Continuously assess and improve security posture
The serverless paradigm offers tremendous benefits in terms of scalability and operational simplicity, but it also requires a new approach to security. By following the OWASP Serverless Top 10 guidelines and implementing the security patterns shown in this guide, you can build robust, secure serverless applications that protect your organization and users.
Remember: in serverless security, paranoia is a feature, not a bug.