AWS Lambda and the OWASP Serverless Top 10: A Security Deep Dive

Serverless computing has revolutionized how we build and deploy applications, but it has also introduced new security challenges that traditional security frameworks don’t address. The OWASP Serverless Top 10 provides a comprehensive framework for understanding and mitigating serverless-specific security risks.

This deep dive explores each vulnerability through the lens of AWS Lambda, providing practical examples and robust mitigation strategies.

SLS01: Function Event-Data Injection

Event-data injection occurs when untrusted data from event sources is processed without proper validation, leading to code injection, command execution, or data corruption.

Vulnerable Lambda Function

import json
import subprocess
import boto3
import os

def lambda_handler(event, context):
    """VULNERABLE: Direct processing of event data without validation"""

    # SLS01 Vulnerability #1: Command Injection via event data
    username = event.get('username', '')

    # Dangerous: Direct shell execution with user input
    result = subprocess.run(f"echo 'Processing user: {username}'",
                           shell=True, capture_output=True, text=True)

    # SLS01 Vulnerability #2: SQL Injection via event data
    db_query = event.get('query', '')

    # Dangerous: Direct SQL execution
    connection = get_db_connection()
    cursor = connection.cursor()
    cursor.execute(f"SELECT * FROM users WHERE name = '{db_query}'")  # SQL Injection!

    # SLS01 Vulnerability #3: NoSQL Injection via MongoDB
    mongo_filter = event.get('filter', {})

    # Dangerous: Direct MongoDB query with user input
    db = get_mongo_db()
    results = db.users.find(mongo_filter)  # NoSQL Injection!

    # SLS01 Vulnerability #4: Server-Side Template Injection
    template_data = event.get('template', '')

    # Dangerous: Direct template rendering
    from jinja2 import Template
    template = Template(template_data)  # SSTI vulnerability!
    rendered = template.render(user=username)

    return {
        'statusCode': 200,
        'body': json.dumps({
            'message': rendered,
            'query_results': list(results)
        })
    }

# Attack payloads that would exploit these vulnerabilities:
malicious_event_1 = {
    'username': '; rm -rf / #',  # Command injection
}

malicious_event_2 = {
    'query': "' OR 1=1 --",  # SQL injection
}

malicious_event_3 = {
    'filter': {'$where': 'function() { return true; }'},  # NoSQL injection
}

malicious_event_4 = {
    'template': '{{ "".class.mro()[2].subclasses()[40]("/etc/passwd").read() }}',  # SSTI
}

Secure Implementation

import json
import re
import boto3
from botocore.exceptions import ClientError
import logging
from typing import Dict, Any, Optional
import sqlparse
from cerberus import Validator

# Configure logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)

class SecureLambdaHandler:
    """Secure Lambda handler with comprehensive input validation"""

    def __init__(self):
        # Define strict validation schemas
        self.event_schemas = {
            'user_request': {
                'username': {
                    'type': 'string',
                    'regex': '^[a-zA-Z0-9_-]{3,30}$',
                    'required': True
                },
                'email': {
                    'type': 'string',
                    'regex': '^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$',
                    'required': False
                },
                'query_type': {
                    'type': 'string',
                    'allowed': ['user_lookup', 'profile_update', 'data_export'],
                    'required': True
                }
            }
        }

        self.validator = Validator()
        self.sanitizer = DataSanitizer()

    def validate_event_data(self, event: Dict[str, Any], schema_name: str) -> Dict[str, Any]:
        """Validate and sanitize event data"""

        if schema_name not in self.event_schemas:
            raise ValueError(f"Unknown schema: {schema_name}")

        schema = self.event_schemas[schema_name]

        # Validate against schema
        if not self.validator.validate(event, schema):
            logger.error(f"Event validation failed: {self.validator.errors}")
            raise ValueError(f"Invalid event data: {self.validator.errors}")

        # Additional sanitization
        sanitized_event = self.sanitizer.sanitize_event(event)

        return sanitized_event

    def secure_database_query(self, query_params: Dict[str, Any]) -> Optional[Dict[str, Any]]:
        """Secure database query using parameterized queries"""

        try:
            # Use parameterized queries - NEVER string concatenation
            connection = self.get_secure_db_connection()

            with connection.cursor() as cursor:
                # Parameterized query prevents SQL injection
                sql = "SELECT id, username, email FROM users WHERE username = %s AND status = %s"
                cursor.execute(sql, (query_params['username'], 'active'))

                result = cursor.fetchone()
                if result:
                    return {
                        'id': result[0],
                        'username': result[1],
                        'email': result[2]
                    }

        except Exception as e:
            logger.error(f"Database query error: {str(e)}")
            raise

        return None

    def secure_nosql_query(self, filter_params: Dict[str, Any]) -> list:
        """Secure NoSQL query with proper sanitization"""

        # Whitelist allowed filter operations
        allowed_operations = ['$eq', '$ne', '$in', '$nin']

        # Build secure filter
        secure_filter = {}

        if 'username' in filter_params:
            # Only allow exact matches for usernames
            secure_filter['username'] = {'$eq': filter_params['username']}

        if 'status' in filter_params and filter_params['status'] in ['active', 'inactive']:
            secure_filter['status'] = {'$eq': filter_params['status']}

        # Execute safe query
        try:
            db = self.get_mongo_db()
            results = list(db.users.find(secure_filter).limit(100))  # Limit results

            # Remove sensitive fields
            sanitized_results = []
            for result in results:
                sanitized_results.append({
                    'id': str(result['_id']),
                    'username': result['username'],
                    'created_at': result['created_at']
                })

            return sanitized_results

        except Exception as e:
            logger.error(f"NoSQL query error: {str(e)}")
            raise

def lambda_handler(event, context):
    """Secure Lambda handler"""

    handler = SecureLambdaHandler()

    try:
        # Validate and sanitize input
        validated_event = handler.validate_event_data(event, 'user_request')

        # Process based on query type
        query_type = validated_event['query_type']

        if query_type == 'user_lookup':
            result = handler.secure_database_query(validated_event)
        elif query_type == 'profile_update':
            result = handler.update_user_profile(validated_event)
        else:
            raise ValueError(f"Unsupported query type: {query_type}")

        return {
            'statusCode': 200,
            'headers': {
                'Content-Type': 'application/json',
                'X-Content-Type-Options': 'nosniff',
                'X-Frame-Options': 'DENY',
                'X-XSS-Protection': '1; mode=block'
            },
            'body': json.dumps({
                'success': True,
                'data': result
            })
        }

    except ValueError as e:
        logger.error(f"Validation error: {str(e)}")
        return {
            'statusCode': 400,
            'body': json.dumps({'error': 'Invalid request data'})
        }

    except Exception as e:
        logger.error(f"Unexpected error: {str(e)}")
        return {
            'statusCode': 500,
            'body': json.dumps({'error': 'Internal server error'})
        }

class DataSanitizer:
    """Comprehensive data sanitization utilities"""

    def __init__(self):
        # Dangerous patterns to remove
        self.dangerous_patterns = [
            r'<script.*?</script>',
            r'javascript:',
            r'vbscript:',
            r'on\w+\s*=',
            r'eval\s*\(',
            r'exec\s*\(',
            r'\$\{.*?\}',  # Template injection patterns
            r'\{\{.*?\}\}',
            r'<%.*?%>',
        ]

    def sanitize_string(self, value: str) -> str:
        """Sanitize string input"""
        if not isinstance(value, str):
            return str(value)

        # Remove dangerous patterns
        for pattern in self.dangerous_patterns:
            value = re.sub(pattern, '', value, flags=re.IGNORECASE)

        # HTML encode special characters
        value = value.replace('&', '&amp;')
        value = value.replace('<', '&lt;')
        value = value.replace('>', '&gt;')
        value = value.replace('"', '&quot;')
        value = value.replace("'", '&#x27;')

        return value.strip()

    def sanitize_event(self, event: Dict[str, Any]) -> Dict[str, Any]:
        """Recursively sanitize event data"""
        sanitized = {}

        for key, value in event.items():
            if isinstance(value, str):
                sanitized[key] = self.sanitize_string(value)
            elif isinstance(value, dict):
                sanitized[key] = self.sanitize_event(value)
            elif isinstance(value, list):
                sanitized[key] = [self.sanitize_string(item) if isinstance(item, str) else item for item in value]
            else:
                sanitized[key] = value

        return sanitized

SLS02: Broken Authentication

Serverless functions often implement custom authentication mechanisms that can be easily bypassed or exploited.

Vulnerable JWT Implementation

import jwt
import json
from datetime import datetime, timedelta

def lambda_handler(event, context):
    """VULNERABLE: Weak JWT implementation"""

    # SLS02 Vulnerability #1: Weak secret key
    SECRET_KEY = "secret123"  # Weak, predictable key

    # SLS02 Vulnerability #2: No algorithm verification
    token = event['headers'].get('Authorization', '').replace('Bearer ', '')

    try:
        # Dangerous: Doesn't specify algorithm, vulnerable to none algorithm attack
        decoded = jwt.decode(token, SECRET_KEY, options={"verify_signature": False})

        # SLS02 Vulnerability #3: No token expiration validation
        # Missing: exp claim validation

        # SLS02 Vulnerability #4: No issuer/audience validation
        user_id = decoded.get('user_id')
        role = decoded.get('role', 'user')  # Default to user role

        # SLS02 Vulnerability #5: Privilege escalation via role manipulation
        if role == 'admin':
            return admin_function(user_id)
        else:
            return user_function(user_id)

    except jwt.InvalidTokenError:
        return {
            'statusCode': 401,
            'body': json.dumps({'error': 'Invalid token'})
        }

# Attack scenarios:
# 1. None algorithm attack: JWT with "alg": "none"
# 2. Weak key brute force
# 3. Token without expiration
# 4. Role escalation by modifying JWT claims

Secure JWT Implementation

import jwt
import json
import os
import boto3
from datetime import datetime, timedelta
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.backends import default_backend
import logging

logger = logging.getLogger()
logger.setLevel(logging.INFO)

class SecureJWTValidator:
    """Secure JWT validation with comprehensive security controls"""

    def __init__(self):
        # Use AWS Systems Manager for secure key storage
        self.ssm_client = boto3.client('ssm')
        self.secrets_client = boto3.client('secretsmanager')

        # Supported algorithms (explicitly no 'none')
        self.allowed_algorithms = ['RS256', 'ES256', 'HS256']

        # JWT validation options
        self.jwt_options = {
            'verify_signature': True,
            'verify_exp': True,
            'verify_nbf': True,
            'verify_iat': True,
            'verify_aud': True,
            'verify_iss': True,
            'require_exp': True,
            'require_iat': True,
            'require_nbf': False
        }

        # Token constraints
        self.max_token_age = timedelta(hours=1)
        self.allowed_issuers = [
            'https://auth.yourcompany.com',
            'https://identity.yourcompany.com'
        ]
        self.allowed_audiences = [
            'lambda-api',
            'serverless-backend'
        ]

    def get_signing_key(self) -> str:
        """Retrieve signing key from AWS Secrets Manager"""
        try:
            response = self.secrets_client.get_secret_value(
                SecretId=os.environ['JWT_SECRET_ARN']
            )

            secret = json.loads(response['SecretString'])
            return secret['signing_key']

        except Exception as e:
            logger.error(f"Failed to retrieve signing key: {str(e)}")
            raise ValueError("Authentication service unavailable")

    def validate_token_structure(self, token: str) -> Dict[str, Any]:
        """Validate JWT structure before decoding"""

        # Check token format
        parts = token.split('.')
        if len(parts) != 3:
            raise ValueError("Invalid token format")

        # Decode header to check algorithm
        try:
            import base64
            header = json.loads(base64.urlsafe_b64decode(parts[0] + '=='))
        except:
            raise ValueError("Invalid token header")

        # Verify algorithm is allowed
        algorithm = header.get('alg')
        if algorithm not in self.allowed_algorithms:
            raise ValueError(f"Algorithm {algorithm} not allowed")

        # Prevent none algorithm attack
        if algorithm.lower() == 'none':
            raise ValueError("None algorithm not permitted")

        return header

    def validate_jwt_token(self, token: str) -> Dict[str, Any]:
        """Comprehensive JWT validation"""

        if not token:
            raise ValueError("No token provided")

        # Remove Bearer prefix if present
        if token.startswith('Bearer '):
            token = token[7:]

        # Validate token structure
        header = self.validate_token_structure(token)

        # Get signing key
        signing_key = self.get_signing_key()

        try:
            # Decode and validate token
            decoded_token = jwt.decode(
                token,
                signing_key,
                algorithms=self.allowed_algorithms,
                options=self.jwt_options,
                audience=self.allowed_audiences,
                issuer=self.allowed_issuers,
                leeway=timedelta(seconds=10)  # Allow 10 seconds clock skew
            )

            # Additional custom validations
            self.validate_custom_claims(decoded_token)

            return decoded_token

        except jwt.ExpiredSignatureError:
            raise ValueError("Token has expired")
        except jwt.InvalidAudienceError:
            raise ValueError("Invalid audience")
        except jwt.InvalidIssuerError:
            raise ValueError("Invalid issuer")
        except jwt.InvalidSignatureError:
            raise ValueError("Invalid signature")
        except jwt.InvalidTokenError as e:
            raise ValueError(f"Invalid token: {str(e)}")

    def validate_custom_claims(self, claims: Dict[str, Any]):
        """Validate custom claims and business logic"""

        # Validate required claims
        required_claims = ['user_id', 'role', 'permissions', 'session_id']
        for claim in required_claims:
            if claim not in claims:
                raise ValueError(f"Missing required claim: {claim}")

        # Validate user_id format
        user_id = claims['user_id']
        if not isinstance(user_id, str) or not user_id.isalnum():
            raise ValueError("Invalid user_id format")

        # Validate role
        allowed_roles = ['user', 'moderator', 'admin']
        if claims['role'] not in allowed_roles:
            raise ValueError(f"Invalid role: {claims['role']}")

        # Validate permissions structure
        permissions = claims['permissions']
        if not isinstance(permissions, list):
            raise ValueError("Permissions must be a list")

        # Validate session (prevent token reuse after logout)
        session_id = claims['session_id']
        if not self.validate_active_session(user_id, session_id):
            raise ValueError("Session no longer active")

        # Check for privileged operations
        if claims['role'] == 'admin':
            # Additional validation for admin tokens
            if not self.validate_admin_token(claims):
                raise ValueError("Invalid admin token")

    def validate_active_session(self, user_id: str, session_id: str) -> bool:
        """Check if session is still active (not logged out)"""

        try:
            # Check against active sessions in DynamoDB
            dynamodb = boto3.resource('dynamodb')
            sessions_table = dynamodb.Table(os.environ['SESSIONS_TABLE'])

            response = sessions_table.get_item(
                Key={
                    'user_id': user_id,
                    'session_id': session_id
                }
            )

            item = response.get('Item')
            if not item:
                return False

            # Check if session is expired
            expiry = datetime.fromisoformat(item['expires_at'])
            if datetime.utcnow() > expiry:
                return False

            # Check if session is revoked
            return item.get('status') == 'active'

        except Exception as e:
            logger.error(f"Session validation error: {str(e)}")
            return False

    def validate_admin_token(self, claims: Dict[str, Any]) -> bool:
        """Additional validation for admin tokens"""

        # Admin tokens must have shorter expiration
        issued_at = datetime.fromtimestamp(claims['iat'])
        if datetime.utcnow() - issued_at > timedelta(minutes=30):
            return False

        # Admin tokens must include specific claims
        admin_claims = ['admin_level', 'approved_by', 'approval_timestamp']
        for claim in admin_claims:
            if claim not in claims:
                return False

        # Validate admin level
        admin_level = claims['admin_level']
        if admin_level not in ['level1', 'level2', 'level3']:
            return False

        return True

def lambda_handler(event, context):
    """Secure Lambda handler with robust authentication"""

    jwt_validator = SecureJWTValidator()

    try:
        # Extract token from Authorization header
        auth_header = event.get('headers', {}).get('Authorization', '')

        if not auth_header.startswith('Bearer '):
            return {
                'statusCode': 401,
                'headers': {
                    'WWW-Authenticate': 'Bearer realm="Lambda API"'
                },
                'body': json.dumps({'error': 'Bearer token required'})
            }

        # Validate JWT token
        claims = jwt_validator.validate_jwt_token(auth_header)

        # Extract user context
        user_context = {
            'user_id': claims['user_id'],
            'role': claims['role'],
            'permissions': claims['permissions'],
            'session_id': claims['session_id']
        }

        # Check permissions for requested operation
        requested_operation = event.get('pathParameters', {}).get('operation')
        if not has_permission(user_context, requested_operation):
            return {
                'statusCode': 403,
                'body': json.dumps({'error': 'Insufficient permissions'})
            }

        # Process authenticated request
        result = process_authenticated_request(event, user_context)

        return {
            'statusCode': 200,
            'headers': {
                'Content-Type': 'application/json',
                'Cache-Control': 'no-cache, no-store, must-revalidate',
                'Pragma': 'no-cache',
                'Expires': '0'
            },
            'body': json.dumps(result)
        }

    except ValueError as e:
        logger.warning(f"Authentication error: {str(e)}")
        return {
            'statusCode': 401,
            'body': json.dumps({'error': 'Authentication failed'})
        }

    except Exception as e:
        logger.error(f"Unexpected error: {str(e)}")
        return {
            'statusCode': 500,
            'body': json.dumps({'error': 'Internal server error'})
        }

def has_permission(user_context: Dict[str, Any], operation: str) -> bool:
    """Check if user has permission for specific operation"""

    # Define permission matrix
    permission_matrix = {
        'read_profile': ['user', 'moderator', 'admin'],
        'update_profile': ['user', 'moderator', 'admin'],
        'delete_user': ['admin'],
        'manage_roles': ['admin'],
        'view_analytics': ['moderator', 'admin']
    }

    required_roles = permission_matrix.get(operation, [])
    user_role = user_context['role']

    # Check role-based permissions
    if user_role not in required_roles:
        return False

    # Check fine-grained permissions
    user_permissions = user_context['permissions']
    operation_permission = f"operation:{operation}"

    return operation_permission in user_permissions

SLS03: Insecure Serverless Deployment Configuration

Misconfigured serverless deployments can expose sensitive data and create attack vectors.

Vulnerable Serverless Configuration

# serverless.yml - VULNERABLE configuration
service: vulnerable-lambda-api

provider:
  name: aws
  runtime: python3.9
  stage: prod
  region: us-east-1

  # SLS03 Vulnerability #1: Overly permissive IAM permissions
  iamRoleStatements:
    - Effect: Allow
      Action: "*"  # DANGEROUS: Full AWS access!
      Resource: "*"

  # SLS03 Vulnerability #2: No environment variable encryption
  environment:
    DATABASE_PASSWORD: "prod_password_123"  # Plaintext secret!
    API_KEY: "sk-1234567890abcdef"  # Exposed API key!
    JWT_SECRET: "supersecret"  # Weak secret in plaintext!

functions:
  api:
    handler: handler.lambda_handler
    # SLS03 Vulnerability #3: No timeout limits
    timeout: 900  # 15 minutes - way too long!

    # SLS03 Vulnerability #4: Excessive memory allocation
    memorySize: 3008  # Maximum memory when function might need 128MB

    # SLS03 Vulnerability #5: No reserved concurrency
    # Missing: reservedConcurrency setting

    events:
      - http:
          path: /{proxy+}
          method: ANY
          # SLS03 Vulnerability #6: No API Gateway configuration
          # Missing: request validation, throttling, authentication

    # SLS03 Vulnerability #7: No VPC configuration
    # Function has full internet access without restrictions

    # SLS03 Vulnerability #8: No dead letter queue configuration
    # Missing: onError/deadLetterQueue configuration

resources:
  Resources:
    # SLS03 Vulnerability #9: S3 bucket with public access
    DataBucket:
      Type: AWS::S3::Bucket
      Properties:
        PublicAccessBlockConfiguration:
          BlockPublicAcls: false  # DANGEROUS!
          BlockPublicPolicy: false  # DANGEROUS!
          IgnorePublicAcls: false  # DANGEROUS!
          RestrictPublicBuckets: false  # DANGEROUS!

    # SLS03 Vulnerability #10: DynamoDB without encryption
    UsersTable:
      Type: AWS::DynamoDB::Table
      Properties:
        TableName: users
        # Missing: Server-side encryption configuration
        # Missing: Point-in-time recovery
        # Missing: Backup configuration

Secure Serverless Configuration

# serverless.yml - SECURE configuration
service: secure-lambda-api

frameworkVersion: "3"

provider:
  name: aws
  runtime: python3.9
  stage: ${opt:stage, 'dev'}
  region: ${opt:region, 'us-east-1'}

  # Secure IAM role with least privilege principle
  iamRoleStatements:
    # DynamoDB permissions - specific table only
    - Effect: Allow
      Action:
        - dynamodb:GetItem
        - dynamodb:PutItem
        - dynamodb:UpdateItem
        - dynamodb:DeleteItem
        - dynamodb:Query
        - dynamodb:Scan
      Resource:
        - !GetAtt UsersTable.Arn
        - !Sub "${UsersTable.Arn}/index/*"

    # S3 permissions - specific bucket and operations only
    - Effect: Allow
      Action:
        - s3:GetObject
        - s3:PutObject
      Resource: !Sub "${DataBucket}/*"

    # Secrets Manager permissions - specific secrets only
    - Effect: Allow
      Action:
        - secretsmanager:GetSecretValue
      Resource:
        - !Ref DatabaseSecret
        - !Ref JWTSecret
        - !Ref APIKeySecret

    # KMS permissions for encryption/decryption
    - Effect: Allow
      Action:
        - kms:Decrypt
        - kms:GenerateDataKey
      Resource: !GetAtt LambdaKMSKey.Arn

    # CloudWatch Logs permissions
    - Effect: Allow
      Action:
        - logs:CreateLogGroup
        - logs:CreateLogStream
        - logs:PutLogEvents
      Resource: !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:*"

  # Secure environment variables using AWS Secrets Manager
  environment:
    STAGE: ${self:provider.stage}
    REGION: ${self:provider.region}
    USERS_TABLE: !Ref UsersTable
    DATA_BUCKET: !Ref DataBucket
    # Secrets stored in AWS Secrets Manager, not as plain text
    DATABASE_SECRET_ARN: !Ref DatabaseSecret
    JWT_SECRET_ARN: !Ref JWTSecret
    API_KEY_SECRET_ARN: !Ref APIKeySecret
    KMS_KEY_ID: !GetAtt LambdaKMSKey.Arn

  # Global function configuration
  vpc:
    securityGroupIds:
      - !Ref LambdaSecurityGroup
    subnetIds:
      - !Ref PrivateSubnet1
      - !Ref PrivateSubnet2

  # Enable tracing
  tracing:
    lambda: true
    apiGateway: true

functions:
  api:
    handler: handler.lambda_handler

    # Security configurations
    timeout: 30  # Short timeout
    memorySize: 256  # Appropriate memory allocation
    reservedConcurrency: 100  # Prevent resource exhaustion

    # Environment-specific configuration
    environment:
      LOG_LEVEL: ${param:LOG_LEVEL, 'INFO'}

    # Dead letter queue for error handling
    deadLetter:
      targetArn: !GetAtt ErrorQueue.Arn

    # Events with security configurations
    events:
      - http:
          path: /{proxy+}
          method: ANY
          # API Gateway security
          request:
            parameters:
              headers:
                Authorization: true  # Require Authorization header
          # Request validation
          requestValidatorName: RequestValidator
          # CORS configuration
          cors:
            origin: 'https://yourapp.com'
            headers:
              - Content-Type
              - X-Amz-Date
              - Authorization
              - X-Api-Key
              - X-Amz-Security-Token
            allowCredentials: true

  # Health check function with minimal permissions
  health:
    handler: health.handler
    timeout: 10
    memorySize: 128
    iamRoleStatements: []  # No additional permissions needed
    events:
      - http:
          path: /health
          method: GET

# Security-focused custom resources
resources:
  Resources:
    # KMS key for encryption
    LambdaKMSKey:
      Type: AWS::KMS::Key
      Properties:
        Description: "KMS Key for Lambda encryption"
        KeyPolicy:
          Statement:
            - Sid: Enable IAM User Permissions
              Effect: Allow
              Principal:
                AWS: !Sub "arn:aws:iam::${AWS::AccountId}:root"
              Action: "kms:*"
              Resource: "*"
            - Sid: Allow Lambda service
              Effect: Allow
              Principal:
                Service: lambda.amazonaws.com
              Action:
                - kms:Decrypt
                - kms:GenerateDataKey
              Resource: "*"

    # KMS Key Alias
    LambdaKMSKeyAlias:
      Type: AWS::KMS::Alias
      Properties:
        AliasName: alias/lambda-${self:provider.stage}
        TargetKeyId: !Ref LambdaKMSKey

    # Secure S3 bucket
    DataBucket:
      Type: AWS::S3::Bucket
      Properties:
        BucketName: !Sub "secure-lambda-data-${self:provider.stage}-${AWS::AccountId}"

        # Enable versioning
        VersioningConfiguration:
          Status: Enabled

        # Server-side encryption
        BucketEncryption:
          ServerSideEncryptionConfiguration:
            - ServerSideEncryptionByDefault:
                SSEAlgorithm: aws:kms
                KMSMasterKeyID: !GetAtt LambdaKMSKey.Arn
              BucketKeyEnabled: true

        # Block all public access
        PublicAccessBlockConfiguration:
          BlockPublicAcls: true
          BlockPublicPolicy: true
          IgnorePublicAcls: true
          RestrictPublicBuckets: true

        # Lifecycle policy
        LifecycleConfiguration:
          Rules:
            - Id: DeleteOldVersions
              Status: Enabled
              NoncurrentVersionExpirationInDays: 30

        # Logging configuration
        LoggingConfiguration:
          DestinationBucketName: !Ref LoggingBucket
          LogFilePrefix: access-logs/

        # Notification configuration for security monitoring
        NotificationConfiguration:
          CloudWatchConfigurations:
            - Event: s3:ObjectCreated:*
              CloudWatchConfiguration:
                LogGroupName: !Ref S3AccessLogGroup

    # Secure DynamoDB table
    UsersTable:
      Type: AWS::DynamoDB::Table
      Properties:
        TableName: !Sub "users-${self:provider.stage}"

        # Billing mode and capacity
        BillingMode: PAY_PER_REQUEST

        # Attribute definitions
        AttributeDefinitions:
          - AttributeName: user_id
            AttributeType: S
          - AttributeName: email
            AttributeType: S

        # Key schema
        KeySchema:
          - AttributeName: user_id
            KeyType: HASH

        # Global Secondary Indexes
        GlobalSecondaryIndexes:
          - IndexName: EmailIndex
            KeySchema:
              - AttributeName: email
                KeyType: HASH
            Projection:
              ProjectionType: ALL

        # Encryption at rest
        SSESpecification:
          SSEEnabled: true
          KMSMasterKeyId: !GetAtt LambdaKMSKey.Arn

        # Point-in-time recovery
        PointInTimeRecoverySpecification:
          PointInTimeRecoveryEnabled: true

        # Backup policy
        BackupPolicy:
          PointInTimeRecoveryEnabled: true

        # Stream specification for audit logging
        StreamSpecification:
          StreamViewType: NEW_AND_OLD_IMAGES

        # Tags
        Tags:
          - Key: Environment
            Value: ${self:provider.stage}
          - Key: Application
            Value: secure-lambda-api

    # Secrets in AWS Secrets Manager
    DatabaseSecret:
      Type: AWS::SecretsManager::Secret
      Properties:
        Name: !Sub "lambda-db-secret-${self:provider.stage}"
        Description: "Database credentials for Lambda"
        KmsKeyId: !Ref LambdaKMSKey
        SecretString: !Sub |
          {
            "username": "lambda_user",
            "password": "${param:DB_PASSWORD}",
            "host": "${param:DB_HOST}",
            "port": 5432,
            "database": "production"
          }

    JWTSecret:
      Type: AWS::SecretsManager::Secret
      Properties:
        Name: !Sub "lambda-jwt-secret-${self:provider.stage}"
        Description: "JWT signing key for Lambda"
        KmsKeyId: !Ref LambdaKMSKey
        GenerateSecretString:
          SecretStringTemplate: '{}'
          GenerateStringKey: 'signing_key'
          PasswordLength: 64
          ExcludeCharacters: '"@/\'

    APIKeySecret:
      Type: AWS::SecretsManager::Secret
      Properties:
        Name: !Sub "lambda-api-key-${self:provider.stage}"
        Description: "External API keys for Lambda"
        KmsKeyId: !Ref LambdaKMSKey
        SecretString: !Sub |
          {
            "openai_api_key": "${param:OPENAI_API_KEY}",
            "stripe_api_key": "${param:STRIPE_API_KEY}"
          }

    # VPC Configuration
    VPC:
      Type: AWS::EC2::VPC
      Properties:
        CidrBlock: 10.0.0.0/16
        EnableDnsHostnames: true
        EnableDnsSupport: true
        Tags:
          - Key: Name
            Value: !Sub "lambda-vpc-${self:provider.stage}"

    # Private subnets for Lambda functions
    PrivateSubnet1:
      Type: AWS::EC2::Subnet
      Properties:
        VpcId: !Ref VPC
        CidrBlock: 10.0.1.0/24
        AvailabilityZone: !Select [0, !GetAZs '']
        Tags:
          - Key: Name
            Value: !Sub "lambda-private-subnet-1-${self:provider.stage}"

    PrivateSubnet2:
      Type: AWS::EC2::Subnet
      Properties:
        VpcId: !Ref VPC
        CidrBlock: 10.0.2.0/24
        AvailabilityZone: !Select [1, !GetAZs '']
        Tags:
          - Key: Name
            Value: !Sub "lambda-private-subnet-2-${self:provider.stage}"

    # Security group for Lambda functions
    LambdaSecurityGroup:
      Type: AWS::EC2::SecurityGroup
      Properties:
        GroupDescription: Security group for Lambda functions
        VpcId: !Ref VPC
        # Egress rules - restrictive outbound access
        SecurityGroupEgress:
          # HTTPS to AWS services
          - IpProtocol: tcp
            FromPort: 443
            ToPort: 443
            CidrIp: 0.0.0.0/0
          # Database access
          - IpProtocol: tcp
            FromPort: 5432
            ToPort: 5432
            SourceSecurityGroupId: !Ref DatabaseSecurityGroup
        Tags:
          - Key: Name
            Value: !Sub "lambda-sg-${self:provider.stage}"

    # API Gateway Request Validator
    RequestValidator:
      Type: AWS::ApiGateway::RequestValidator
      Properties:
        Name: RequestValidator
        RestApiId: !Ref ApiGatewayRestApi
        ValidateRequestBody: true
        ValidateRequestParameters: true

    # Dead Letter Queue
    ErrorQueue:
      Type: AWS::SQS::Queue
      Properties:
        QueueName: !Sub "lambda-errors-${self:provider.stage}"
        KmsMasterKeyId: !GetAtt LambdaKMSKey.Arn
        MessageRetentionPeriod: 1209600  # 14 days
        VisibilityTimeoutSeconds: 300

    # CloudWatch Log Groups with encryption
    LambdaLogGroup:
      Type: AWS::Logs::LogGroup
      Properties:
        LogGroupName: !Sub "/aws/lambda/secure-lambda-api-${self:provider.stage}-api"
        RetentionInDays: 30
        KmsKeyId: !GetAtt LambdaKMSKey.Arn

# Parameters for sensitive values (passed via CLI or CI/CD)
params:
  default:
    LOG_LEVEL: INFO
  prod:
    LOG_LEVEL: WARN
    DB_PASSWORD: ${ssm:/lambda/prod/db_password~true}
    DB_HOST: ${ssm:/lambda/prod/db_host}
    OPENAI_API_KEY: ${ssm:/lambda/prod/openai_key~true}
    STRIPE_API_KEY: ${ssm:/lambda/prod/stripe_key~true}

# Plugins for additional security
plugins:
  - serverless-plugin-tracing
  - serverless-plugin-aws-alerts
  - serverless-iam-roles-per-function
  - serverless-plugin-resource-tagging

SLS04: Over-Privileged Function Permissions & SLS05: Inadequate Function Monitoring and Logging

These vulnerabilities often go hand-in-hand, so I’ll address them together with a comprehensive monitoring and least-privilege implementation:

Comprehensive Security Monitoring System

# secure_lambda_with_monitoring.py
import json
import boto3
import logging
import time
from datetime import datetime, timezone
from typing import Dict, Any, Optional
import hashlib
import os

# Configure structured logging
class SecurityLogger:
    """Enhanced security-focused logging for Lambda functions"""

    def __init__(self):
        self.logger = logging.getLogger()
        self.logger.setLevel(os.environ.get('LOG_LEVEL', 'INFO'))

        # Create CloudWatch client for custom metrics
        self.cloudwatch = boto3.client('cloudwatch')

        # Security event types
        self.security_events = {
            'AUTHENTICATION_FAILURE': 'security.auth.failure',
            'AUTHORIZATION_FAILURE': 'security.authz.failure',
            'SUSPICIOUS_INPUT': 'security.input.suspicious',
            'RATE_LIMIT_EXCEEDED': 'security.rate_limit.exceeded',
            'PRIVILEGE_ESCALATION': 'security.privilege.escalation',
            'DATA_ACCESS_VIOLATION': 'security.data.violation',
            'ANOMALOUS_BEHAVIOR': 'security.behavior.anomalous'
        }

    def log_security_event(self, event_type: str, details: Dict[str, Any],
                          context: Dict[str, Any], severity: str = 'WARNING'):
        """Log security events with structured format"""

        # Create security log entry
        security_log = {
            'timestamp': datetime.now(timezone.utc).isoformat(),
            'event_type': event_type,
            'severity': severity,
            'lambda_function': context.get('function_name', 'unknown'),
            'request_id': context.get('aws_request_id', 'unknown'),
            'details': details,
            'metadata': {
                'region': os.environ.get('AWS_REGION'),
                'stage': os.environ.get('STAGE'),
                'version': context.get('function_version', 'unknown')
            }
        }

        # Log to CloudWatch
        if severity in ['ERROR', 'CRITICAL']:
            self.logger.error(json.dumps(security_log))
        elif severity == 'WARNING':
            self.logger.warning(json.dumps(security_log))
        else:
            self.logger.info(json.dumps(security_log))

        # Send custom metric to CloudWatch
        self.send_security_metric(event_type, severity)

        # Send to security event queue for real-time alerting
        if severity in ['ERROR', 'CRITICAL']:
            self.send_to_security_queue(security_log)

    def send_security_metric(self, event_type: str, severity: str):
        """Send custom security metrics to CloudWatch"""

        try:
            self.cloudwatch.put_metric_data(
                Namespace='Lambda/Security',
                MetricData=[
                    {
                        'MetricName': 'SecurityEvents',
                        'Dimensions': [
                            {
                                'Name': 'EventType',
                                'Value': event_type
                            },
                            {
                                'Name': 'Severity',
                                'Value': severity
                            },
                            {
                                'Name': 'FunctionName',
                                'Value': os.environ.get('AWS_LAMBDA_FUNCTION_NAME', 'unknown')
                            }
                        ],
                        'Value': 1,
                        'Unit': 'Count',
                        'Timestamp': datetime.now(timezone.utc)
                    }
                ]
            )
        except Exception as e:
            self.logger.error(f"Failed to send security metric: {str(e)}")

    def send_to_security_queue(self, security_log: Dict[str, Any]):
        """Send critical security events to SQS for real-time processing"""

        try:
            sqs = boto3.client('sqs')
            queue_url = os.environ.get('SECURITY_ALERTS_QUEUE')

            if queue_url:
                sqs.send_message(
                    QueueUrl=queue_url,
                    MessageBody=json.dumps(security_log),
                    MessageAttributes={
                        'EventType': {
                            'StringValue': security_log['event_type'],
                            'DataType': 'String'
                        },
                        'Severity': {
                            'StringValue': security_log['severity'],
                            'DataType': 'String'
                        }
                    }
                )
        except Exception as e:
            self.logger.error(f"Failed to send to security queue: {str(e)}")

class PrivilegeManager:
    """Implement fine-grained privilege management"""

    def __init__(self):
        self.security_logger = SecurityLogger()

        # Define operation-specific permissions
        self.operation_permissions = {
            'read_user_profile': {
                'required_role': 'user',
                'aws_permissions': ['dynamodb:GetItem'],
                'resource_pattern': 'users-table',
                'data_classification': 'internal'
            },
            'update_user_profile': {
                'required_role': 'user',
                'aws_permissions': ['dynamodb:UpdateItem'],
                'resource_pattern': 'users-table',
                'data_classification': 'internal',
                'owner_only': True
            },
            'delete_user': {
                'required_role': 'admin',
                'aws_permissions': ['dynamodb:DeleteItem'],
                'resource_pattern': 'users-table',
                'data_classification': 'confidential'
            },
            'export_user_data': {
                'required_role': 'admin',
                'aws_permissions': ['dynamodb:Scan', 's3:PutObject'],
                'resource_pattern': ['users-table', 'export-bucket'],
                'data_classification': 'confidential',
                'audit_required': True
            }
        }

    def check_operation_permissions(self, operation: str, user_context: Dict[str, Any],
                                  resource_context: Dict[str, Any]) -> Dict[str, Any]:
        """Check if user has permissions for specific operation"""

        permission_check = {
            'allowed': False,
            'reason': '',
            'audit_required': False,
            'restrictions': []
        }

        if operation not in self.operation_permissions:
            permission_check['reason'] = f"Unknown operation: {operation}"
            self.security_logger.log_security_event(
                'AUTHORIZATION_FAILURE',
                {'operation': operation, 'reason': 'unknown_operation'},
                user_context,
                'ERROR'
            )
            return permission_check

        op_config = self.operation_permissions[operation]

        # Check role requirements
        required_role = op_config['required_role']
        user_role = user_context.get('role', 'guest')

        role_hierarchy = {'guest': 0, 'user': 1, 'moderator': 2, 'admin': 3}

        if role_hierarchy.get(user_role, 0) < role_hierarchy.get(required_role, 999):
            permission_check['reason'] = f"Insufficient role: {user_role} < {required_role}"
            self.security_logger.log_security_event(
                'AUTHORIZATION_FAILURE',
                {
                    'operation': operation,
                    'user_role': user_role,
                    'required_role': required_role
                },
                user_context,
                'WARNING'
            )
            return permission_check

        # Check owner-only restrictions
        if op_config.get('owner_only', False):
            user_id = user_context.get('user_id')
            resource_owner = resource_context.get('owner_id')

            if user_id != resource_owner and user_role != 'admin':
                permission_check['reason'] = "Owner-only operation"
                self.security_logger.log_security_event(
                    'AUTHORIZATION_FAILURE',
                    {
                        'operation': operation,
                        'user_id': user_id,
                        'resource_owner': resource_owner
                    },
                    user_context,
                    'WARNING'
                )
                return permission_check

        # Check data classification restrictions
        data_classification = op_config.get('data_classification', 'public')
        user_clearance = user_context.get('data_clearance', 'public')

        clearance_levels = {'public': 0, 'internal': 1, 'confidential': 2, 'secret': 3}

        if clearance_levels.get(user_clearance, 0) < clearance_levels.get(data_classification, 0):
            permission_check['reason'] = f"Insufficient clearance: {user_clearance} < {data_classification}"
            self.security_logger.log_security_event(
                'AUTHORIZATION_FAILURE',
                {
                    'operation': operation,
                    'user_clearance': user_clearance,
                    'required_clearance': data_classification
                },
                user_context,
                'ERROR'
            )
            return permission_check

        # Set audit requirements
        permission_check['audit_required'] = op_config.get('audit_required', False)

        # Operation is allowed
        permission_check['allowed'] = True
        permission_check['reason'] = 'Permission granted'

        return permission_check

class SecurityAuditLogger:
    """Comprehensive audit logging for compliance"""

    def __init__(self):
        self.dynamodb = boto3.resource('dynamodb')
        self.audit_table = self.dynamodb.Table(os.environ.get('AUDIT_TABLE', 'security-audit'))
        self.s3_client = boto3.client('s3')
        self.audit_bucket = os.environ.get('AUDIT_BUCKET')

    def log_audit_event(self, event_type: str, user_context: Dict[str, Any],
                       operation_details: Dict[str, Any], lambda_context: Dict[str, Any]):
        """Log audit events for compliance and forensics"""

        # Create audit record
        audit_record = {
            'audit_id': self.generate_audit_id(user_context, lambda_context),
            'timestamp': datetime.now(timezone.utc).isoformat(),
            'event_type': event_type,
            'user_id': user_context.get('user_id', 'anonymous'),
            'user_role': user_context.get('role', 'unknown'),
            'session_id': user_context.get('session_id', 'unknown'),
            'function_name': lambda_context.get('function_name'),
            'request_id': lambda_context.get('aws_request_id'),
            'operation_details': operation_details,
            'source_ip': operation_details.get('source_ip', 'unknown'),
            'user_agent': operation_details.get('user_agent', 'unknown'),
            'ttl': int(time.time()) + (365 * 24 * 3600)  # Retain for 1 year
        }

        # Store in DynamoDB
        try:
            self.audit_table.put_item(Item=audit_record)
        except Exception as e:
            logging.error(f"Failed to store audit record: {str(e)}")

        # Also store in S3 for long-term retention and analysis
        if self.audit_bucket:
            self.store_audit_in_s3(audit_record)

    def generate_audit_id(self, user_context: Dict[str, Any], lambda_context: Dict[str, Any]) -> str:
        """Generate unique audit ID"""

        content = f"{user_context.get('user_id')}-{lambda_context.get('aws_request_id')}-{time.time()}"
        return hashlib.sha256(content.encode()).hexdigest()[:16]

    def store_audit_in_s3(self, audit_record: Dict[str, Any]):
        """Store audit record in S3 for long-term retention"""

        try:
            # Organize by date for efficient querying
            date_prefix = datetime.now(timezone.utc).strftime('%Y/%m/%d')
            s3_key = f"audit-logs/{date_prefix}/{audit_record['audit_id']}.json"

            self.s3_client.put_object(
                Bucket=self.audit_bucket,
                Key=s3_key,
                Body=json.dumps(audit_record),
                ServerSideEncryption='aws:kms',
                SSEKMSKeyId=os.environ.get('AUDIT_KMS_KEY'),
                ContentType='application/json'
            )
        except Exception as e:
            logging.error(f"Failed to store audit record in S3: {str(e)}")

def lambda_handler(event, context):
    """Secure Lambda handler with comprehensive monitoring and least privilege"""

    # Initialize security components
    security_logger = SecurityLogger()
    privilege_manager = PrivilegeManager()
    audit_logger = SecurityAuditLogger()

    # Extract request context
    request_context = {
        'source_ip': event.get('requestContext', {}).get('identity', {}).get('sourceIp', 'unknown'),
        'user_agent': event.get('headers', {}).get('User-Agent', 'unknown'),
        'method': event.get('httpMethod', 'unknown'),
        'path': event.get('path', 'unknown'),
        'query_params': event.get('queryStringParameters', {}),
        'headers': {k: v for k, v in event.get('headers', {}).items() if k.lower() not in ['authorization', 'cookie']}
    }

    lambda_context = {
        'function_name': context.function_name,
        'aws_request_id': context.aws_request_id,
        'function_version': context.function_version,
        'remaining_time': context.get_remaining_time_in_millis()
    }

    try:
        # Step 1: Authenticate user (extract from JWT token)
        auth_result = authenticate_user(event)
        if not auth_result['success']:
            security_logger.log_security_event(
                'AUTHENTICATION_FAILURE',
                {
                    'reason': auth_result['reason'],
                    'source_ip': request_context['source_ip']
                },
                lambda_context,
                'WARNING'
            )
            return {
                'statusCode': 401,
                'body': json.dumps({'error': 'Authentication failed'})
            }

        user_context = auth_result['user_context']

        # Step 2: Determine requested operation
        requested_operation = extract_operation(event)

        # Step 3: Check permissions
        permission_result = privilege_manager.check_operation_permissions(
            requested_operation,
            user_context,
            request_context
        )

        if not permission_result['allowed']:
            security_logger.log_security_event(
                'AUTHORIZATION_FAILURE',
                {
                    'operation': requested_operation,
                    'reason': permission_result['reason'],
                    'user_id': user_context.get('user_id')
                },
                lambda_context,
                'WARNING'
            )
            return {
                'statusCode': 403,
                'body': json.dumps({'error': 'Access denied'})
            }

        # Step 4: Log audit event if required
        if permission_result['audit_required']:
            audit_logger.log_audit_event(
                'OPERATION_EXECUTED',
                user_context,
                {
                    'operation': requested_operation,
                    **request_context
                },
                lambda_context
            )

        # Step 5: Execute operation with monitoring
        start_time = time.time()

        result = execute_secure_operation(
            requested_operation,
            event,
            user_context,
            security_logger,
            lambda_context
        )

        execution_time = time.time() - start_time

        # Step 6: Log successful operation
        security_logger.log_security_event(
            'OPERATION_SUCCESS',
            {
                'operation': requested_operation,
                'user_id': user_context.get('user_id'),
                'execution_time': execution_time
            },
            lambda_context,
            'INFO'
        )

        # Step 7: Return secure response
        return {
            'statusCode': 200,
            'headers': {
                'Content-Type': 'application/json',
                'X-Content-Type-Options': 'nosniff',
                'X-Frame-Options': 'DENY',
                'X-XSS-Protection': '1; mode=block',
                'Strict-Transport-Security': 'max-age=31536000; includeSubDomains',
                'Cache-Control': 'no-cache, no-store, must-revalidate'
            },
            'body': json.dumps({
                'success': True,
                'data': result,
                'request_id': context.aws_request_id
            })
        }

    except Exception as e:
        # Log error with full context for debugging
        security_logger.log_security_event(
            'OPERATION_ERROR',
            {
                'error': str(e),
                'error_type': type(e).__name__,
                'user_id': user_context.get('user_id', 'unknown') if 'user_context' in locals() else 'unknown',
                'operation': requested_operation if 'requested_operation' in locals() else 'unknown'
            },
            lambda_context,
            'ERROR'
        )

        return {
            'statusCode': 500,
            'headers': {
                'Content-Type': 'application/json'
            },
            'body': json.dumps({
                'error': 'Internal server error',
                'request_id': context.aws_request_id
            })
        }

def execute_secure_operation(operation: str, event: Dict[str, Any],
                           user_context: Dict[str, Any], security_logger: SecurityLogger,
                           lambda_context: Dict[str, Any]) -> Any:
    """Execute operations with security monitoring"""

    # Operation routing with security checks
    if operation == 'read_user_profile':
        return read_user_profile_secure(event, user_context, security_logger)
    elif operation == 'update_user_profile':
        return update_user_profile_secure(event, user_context, security_logger)
    elif operation == 'delete_user':
        return delete_user_secure(event, user_context, security_logger)
    else:
        raise ValueError(f"Unknown operation: {operation}")

Conclusion: Building Secure Serverless Applications

The OWASP Serverless Top 10 provides a critical framework for understanding and mitigating serverless-specific security risks. As we’ve seen through these examples, securing serverless applications requires:

  1. Input Validation and Sanitization: Never trust event data without thorough validation
  2. Robust Authentication and Authorization: Implement proper JWT validation with comprehensive security checks
  3. Secure Configuration Management: Use AWS Secrets Manager, KMS encryption, and least-privilege IAM policies
  4. Comprehensive Monitoring: Implement detailed logging, metrics, and real-time alerting
  5. Defense in Depth: Layer multiple security controls to protect against sophisticated attacks

Key implementation principles:

  • Least Privilege: Grant only the minimum permissions required for each function
  • Fail Secure: Default to denying access when in doubt
  • Comprehensive Auditing: Log all security-relevant events for compliance and forensics
  • Real-time Monitoring: Detect and respond to security incidents quickly
  • Regular Security Reviews: Continuously assess and improve security posture

The serverless paradigm offers tremendous benefits in terms of scalability and operational simplicity, but it also requires a new approach to security. By following the OWASP Serverless Top 10 guidelines and implementing the security patterns shown in this guide, you can build robust, secure serverless applications that protect your organization and users.

Remember: in serverless security, paranoia is a feature, not a bug.