Serverless Runtime Security: Protecting the Invisible Infrastructure

In serverless computing, the runtime is your last line of defense. While AWS manages the underlying infrastructure, the responsibility for runtime security falls squarely on your shoulders. This invisible layer—where your code executes, where secrets are processed, and where data flows—represents one of the most critical and often overlooked attack surfaces in modern cloud applications.

This comprehensive guide explores advanced serverless runtime security techniques, from hardening execution environments to detecting sophisticated attacks in real-time.

The Serverless Runtime Threat Model

Understanding the unique attack vectors targeting serverless runtimes is crucial for effective defense:

Lambda Runtime Architecture Vulnerabilities

# runtime_threat_analysis.py - Analyze runtime-specific threats
import os
import sys
import inspect
import subprocess
import json
import time
from typing import Dict, Any, List
import logging
import hashlib

logger = logging.getLogger()

class RuntimeThreatAnalyzer:
    """Analyze and detect runtime-specific security threats"""

    def __init__(self):
        # Initialize threat detection signatures
        self.threat_signatures = {
            'code_injection': [
                r'eval\s*\(',
                r'exec\s*\(',
                r'compile\s*\(',
                r'__import__\s*\(',
                r'getattr\s*\(',
                r'setattr\s*\(',
            ],
            'command_injection': [
                r'subprocess\.',
                r'os\.system',
                r'os\.popen',
                r'commands\.',
                r'shell=True'
            ],
            'path_traversal': [
                r'\.\./',
                r'\.\.\\',
                r'/etc/passwd',
                r'/proc/',
                r'~/'
            ],
            'environment_manipulation': [
                r'os\.environ',
                r'sys\.path',
                r'sys\.modules',
                r'globals\s*\(',
                r'locals\s*\('
            ]
        }

        # Runtime monitoring state
        self.runtime_state = {
            'start_time': time.time(),
            'function_calls': [],
            'file_access_attempts': [],
            'network_connections': [],
            'environment_changes': []
        }

    def analyze_runtime_environment(self, context) -> Dict[str, Any]:
        """Comprehensive runtime environment analysis"""

        environment_analysis = {
            'timestamp': time.time(),
            'function_name': getattr(context, 'function_name', 'unknown'),
            'function_version': getattr(context, 'function_version', 'unknown'),
            'remaining_time': getattr(context, 'get_remaining_time_in_millis', lambda: 0)(),
            'memory_limit': getattr(context, 'memory_limit_in_mb', 'unknown'),
            'request_id': getattr(context, 'aws_request_id', 'unknown')
        }

        # Analyze environment variables for sensitive data exposure
        environment_analysis['env_analysis'] = self.analyze_environment_variables()

        # Check for suspicious modules loaded
        environment_analysis['module_analysis'] = self.analyze_loaded_modules()

        # Analyze file system permissions
        environment_analysis['filesystem_analysis'] = self.analyze_filesystem_permissions()

        # Check for runtime modifications
        environment_analysis['runtime_modifications'] = self.detect_runtime_modifications()

        return environment_analysis

    def analyze_environment_variables(self) -> Dict[str, Any]:
        """Analyze environment variables for security issues"""

        env_analysis = {
            'total_vars': len(os.environ),
            'suspicious_vars': [],
            'exposed_secrets': [],
            'aws_vars': {}
        }

        # Patterns that might indicate secrets
        secret_patterns = [
            r'password', r'passwd', r'pwd',
            r'secret', r'key', r'token',
            r'api[_-]?key', r'access[_-]?key',
            r'private[_-]?key', r'auth'
        ]

        for var_name, var_value in os.environ.items():
            var_lower = var_name.lower()

            # Check for AWS-specific variables
            if var_name.startswith(('AWS_', 'LAMBDA_')):
                env_analysis['aws_vars'][var_name] = len(var_value)

            # Check for potential secrets
            for pattern in secret_patterns:
                if re.search(pattern, var_lower):
                    env_analysis['suspicious_vars'].append({
                        'name': var_name,
                        'pattern': pattern,
                        'value_length': len(var_value),
                        'starts_with': var_value[:10] if var_value else '',
                        'risk_level': 'high' if len(var_value) > 20 else 'medium'
                    })

            # Check for exposed secrets (actual secret values)
            if self.is_potential_secret(var_value):
                env_analysis['exposed_secrets'].append({
                    'name': var_name,
                    'type': self.classify_secret_type(var_value),
                    'value_hash': hashlib.sha256(var_value.encode()).hexdigest()[:16]
                })

        return env_analysis

    def analyze_loaded_modules(self) -> Dict[str, Any]:
        """Analyze loaded Python modules for suspicious activity"""

        module_analysis = {
            'total_modules': len(sys.modules),
            'suspicious_modules': [],
            'dangerous_modules': [],
            'recently_loaded': []
        }

        # Known dangerous modules (for code execution)
        dangerous_modules = [
            'subprocess', 'os', 'sys', 'eval', 'exec',
            'compile', 'importlib', 'runpy', 'code'
        ]

        # Suspicious modules (often used in attacks)
        suspicious_modules = [
            'pickle', 'marshal', 'types', 'ctypes',
            'gc', 'inspect', 'ast', 'dis'
        ]

        for module_name, module_obj in sys.modules.items():
            if module_name in dangerous_modules:
                module_analysis['dangerous_modules'].append({
                    'name': module_name,
                    'file': getattr(module_obj, '__file__', 'built-in'),
                    'version': getattr(module_obj, '__version__', 'unknown')
                })

            if module_name in suspicious_modules:
                module_analysis['suspicious_modules'].append({
                    'name': module_name,
                    'file': getattr(module_obj, '__file__', 'built-in')
                })

            # Check for recently loaded modules (potential dynamic loading)
            if hasattr(module_obj, '__file__') and module_obj.__file__:
                try:
                    stat = os.stat(module_obj.__file__)
                    load_time = stat.st_mtime

                    # If loaded in the last 60 seconds
                    if time.time() - load_time < 60:
                        module_analysis['recently_loaded'].append({
                            'name': module_name,
                            'file': module_obj.__file__,
                            'load_time': load_time
                        })
                except:
                    pass

        return module_analysis

    def analyze_filesystem_permissions(self) -> Dict[str, Any]:
        """Analyze filesystem access patterns and permissions"""

        filesystem_analysis = {
            'writable_paths': [],
            'sensitive_files': [],
            'temp_directory_usage': {},
            'unusual_access_patterns': []
        }

        # Check common writable locations
        writable_locations = ['/tmp', '/dev/shm', '/var/tmp']

        for location in writable_locations:
            if os.path.exists(location) and os.access(location, os.W_OK):
                try:
                    files = os.listdir(location)
                    filesystem_analysis['writable_paths'].append({
                        'path': location,
                        'writable': True,
                        'file_count': len(files),
                        'files': files[:10]  # First 10 files
                    })
                except PermissionError:
                    filesystem_analysis['writable_paths'].append({
                        'path': location,
                        'writable': True,
                        'accessible': False
                    })

        # Check for sensitive file access
        sensitive_files = [
            '/etc/passwd', '/etc/shadow', '/etc/hosts',
            '/proc/version', '/proc/cpuinfo', '/proc/meminfo'
        ]

        for file_path in sensitive_files:
            if os.path.exists(file_path):
                try:
                    with open(file_path, 'r') as f:
                        content = f.read(100)  # First 100 chars
                    filesystem_analysis['sensitive_files'].append({
                        'path': file_path,
                        'accessible': True,
                        'content_preview': content
                    })
                except PermissionError:
                    filesystem_analysis['sensitive_files'].append({
                        'path': file_path,
                        'accessible': False
                    })

        return filesystem_analysis

    def detect_runtime_modifications(self) -> Dict[str, Any]:
        """Detect modifications to the runtime environment"""

        modifications = {
            'sys_path_changes': [],
            'module_patches': [],
            'builtin_modifications': [],
            'global_namespace_pollution': []
        }

        # Check sys.path modifications
        original_sys_path = [
            '/var/runtime',
            '/opt/python',
            '/var/task',
            '/opt/python/lib/python3.9/site-packages'
        ]

        current_paths = sys.path
        for path in current_paths:
            if path not in original_sys_path and path:
                modifications['sys_path_changes'].append(path)

        # Check for monkey patching of built-in functions
        dangerous_builtins = ['eval', 'exec', 'compile', '__import__']
        for builtin_name in dangerous_builtins:
            if hasattr(__builtins__, builtin_name):
                builtin_func = getattr(__builtins__, builtin_name)
                if hasattr(builtin_func, '__wrapped__'):  # Likely wrapped/patched
                    modifications['builtin_modifications'].append({
                        'name': builtin_name,
                        'wrapped': True,
                        'wrapper': str(builtin_func)
                    })

        return modifications

    def is_potential_secret(self, value: str) -> bool:
        """Determine if a value looks like a secret"""

        if not isinstance(value, str) or len(value) < 10:
            return False

        # Check for common secret patterns
        secret_indicators = [
            len(value) > 20,  # Long values
            any(c.isupper() and c.islower() for c in value),  # Mixed case
            any(c.isdigit() for c in value),  # Contains numbers
            any(c in '!@#$%^&*()_+-=' for c in value),  # Special characters
        ]

        return sum(secret_indicators) >= 2

    def classify_secret_type(self, value: str) -> str:
        """Classify the type of secret"""

        if value.startswith('sk-'):
            return 'openai_api_key'
        elif value.startswith('xoxb-'):
            return 'slack_bot_token'
        elif value.startswith('ghp_'):
            return 'github_personal_token'
        elif len(value) == 32 and all(c in '0123456789abcdef' for c in value):
            return 'md5_hash'
        elif len(value) == 64 and all(c in '0123456789abcdef' for c in value):
            return 'sha256_hash'
        elif 'AKIA' in value:
            return 'aws_access_key'
        else:
            return 'unknown'

class RuntimeSecurityMonitor:
    """Real-time runtime security monitoring"""

    def __init__(self):
        self.threat_analyzer = RuntimeThreatAnalyzer()
        self.security_events = []
        self.monitoring_enabled = True

        # Hook into critical functions
        self.setup_security_hooks()

    def setup_security_hooks(self):
        """Set up hooks for monitoring critical operations"""

        # Wrap dangerous functions
        self.original_eval = __builtins__.get('eval', eval)
        self.original_exec = __builtins__.get('exec', exec)

        # Replace with monitored versions
        __builtins__['eval'] = self.monitored_eval
        __builtins__['exec'] = self.monitored_exec

        # Hook file operations
        self.original_open = open

    def monitored_eval(self, expression, globals=None, locals=None):
        """Monitored version of eval function"""

        self.log_security_event(
            'DANGEROUS_FUNCTION_CALL',
            {
                'function': 'eval',
                'expression': str(expression)[:100],
                'caller': self.get_caller_info()
            },
            'HIGH'
        )

        # Allow evaluation but log it
        return self.original_eval(expression, globals, locals)

    def monitored_exec(self, object, globals=None, locals=None):
        """Monitored version of exec function"""

        self.log_security_event(
            'DANGEROUS_FUNCTION_CALL',
            {
                'function': 'exec',
                'object': str(object)[:100],
                'caller': self.get_caller_info()
            },
            'HIGH'
        )

        # Allow execution but log it
        return self.original_exec(object, globals, locals)

    def get_caller_info(self) -> Dict[str, Any]:
        """Get information about the calling function"""

        frame = inspect.currentframe()
        try:
            # Go up the stack to find the actual caller
            caller_frame = frame.f_back.f_back

            return {
                'filename': caller_frame.f_code.co_filename,
                'function_name': caller_frame.f_code.co_name,
                'line_number': caller_frame.f_lineno,
                'locals_count': len(caller_frame.f_locals)
            }
        except:
            return {'error': 'Could not determine caller'}
        finally:
            del frame

    def log_security_event(self, event_type: str, details: Dict[str, Any], severity: str):
        """Log security events for analysis"""

        security_event = {
            'timestamp': time.time(),
            'event_type': event_type,
            'details': details,
            'severity': severity,
            'runtime_context': {
                'memory_usage': self.get_memory_usage(),
                'execution_time': time.time() - self.threat_analyzer.runtime_state['start_time']
            }
        }

        self.security_events.append(security_event)

        # Send to CloudWatch for real-time alerting
        self.send_security_alert(security_event)

    def get_memory_usage(self) -> Dict[str, Any]:
        """Get current memory usage statistics"""

        try:
            import psutil
            process = psutil.Process()
            memory_info = process.memory_info()

            return {
                'rss': memory_info.rss,
                'vms': memory_info.vms,
                'percent': process.memory_percent()
            }
        except ImportError:
            # psutil not available, use basic metrics
            import resource
            usage = resource.getrusage(resource.RUSAGE_SELF)

            return {
                'max_rss': usage.ru_maxrss,
                'shared_memory': usage.ru_ixrss,
                'data_memory': usage.ru_idrss
            }

    def send_security_alert(self, security_event: Dict[str, Any]):
        """Send security alerts to monitoring systems"""

        if security_event['severity'] in ['HIGH', 'CRITICAL']:
            try:
                # Send to CloudWatch
                import boto3
                cloudwatch = boto3.client('cloudwatch')

                cloudwatch.put_metric_data(
                    Namespace='Lambda/Security',
                    MetricData=[
                        {
                            'MetricName': 'SecurityEvents',
                            'Dimensions': [
                                {
                                    'Name': 'EventType',
                                    'Value': security_event['event_type']
                                },
                                {
                                    'Name': 'Severity',
                                    'Value': security_event['severity']
                                }
                            ],
                            'Value': 1,
                            'Unit': 'Count'
                        }
                    ]
                )

                # Send to SQS for immediate processing
                if security_event['severity'] == 'CRITICAL':
                    sqs = boto3.client('sqs')
                    queue_url = os.environ.get('SECURITY_ALERTS_QUEUE')

                    if queue_url:
                        sqs.send_message(
                            QueueUrl=queue_url,
                            MessageBody=json.dumps(security_event),
                            MessageAttributes={
                                'Severity': {
                                    'StringValue': security_event['severity'],
                                    'DataType': 'String'
                                }
                            }
                        )

            except Exception as e:
                logger.error(f"Failed to send security alert: {e}")

    def get_security_report(self) -> Dict[str, Any]:
        """Generate comprehensive security report"""

        report = {
            'monitoring_duration': time.time() - self.threat_analyzer.runtime_state['start_time'],
            'total_events': len(self.security_events),
            'events_by_severity': {},
            'events_by_type': {},
            'recent_events': self.security_events[-10:],  # Last 10 events
            'recommendations': []
        }

        # Group events by severity
        for event in self.security_events:
            severity = event['severity']
            report['events_by_severity'][severity] = report['events_by_severity'].get(severity, 0) + 1

            event_type = event['event_type']
            report['events_by_type'][event_type] = report['events_by_type'].get(event_type, 0) + 1

        # Generate recommendations
        if report['events_by_severity'].get('HIGH', 0) > 0:
            report['recommendations'].append('Review high-severity security events immediately')

        if report['events_by_type'].get('DANGEROUS_FUNCTION_CALL', 0) > 3:
            report['recommendations'].append('Consider code review for excessive use of dangerous functions')

        return report

# Global security monitor instance
security_monitor = RuntimeSecurityMonitor()

def secure_lambda_handler(event, context):
    """Lambda handler with comprehensive runtime security monitoring"""

    try:
        # Initialize runtime analysis
        runtime_analysis = security_monitor.threat_analyzer.analyze_runtime_environment(context)

        # Log runtime initialization
        security_monitor.log_security_event(
            'RUNTIME_INITIALIZED',
            runtime_analysis,
            'INFO'
        )

        # Process the actual request
        request_type = event.get('type', 'unknown')

        if request_type == 'security_analysis':
            # Return security analysis
            return {
                'statusCode': 200,
                'body': json.dumps({
                    'runtime_analysis': runtime_analysis,
                    'security_report': security_monitor.get_security_report()
                })
            }

        # Example of dangerous operation that would be detected
        elif request_type == 'dangerous_operation':
            # This would trigger security monitoring
            user_input = event.get('code', 'print("Hello World")')

            # DANGEROUS: This would be logged and monitored
            exec(user_input)

            return {
                'statusCode': 200,
                'body': json.dumps({
                    'message': 'Dangerous operation completed',
                    'security_events': len(security_monitor.security_events)
                })
            }

        # Normal operation
        else:
            return {
                'statusCode': 200,
                'body': json.dumps({
                    'message': 'Request processed safely',
                    'runtime_analysis': runtime_analysis
                })
            }

    except Exception as e:
        # Log security exception
        security_monitor.log_security_event(
            'RUNTIME_EXCEPTION',
            {
                'error': str(e),
                'error_type': type(e).__name__,
                'caller': security_monitor.get_caller_info()
            },
            'HIGH'
        )

        logger.error(f"Runtime security error: {e}")

        return {
            'statusCode': 500,
            'body': json.dumps({
                'error': 'Internal server error',
                'security_events': len(security_monitor.security_events)
            })
        }

Advanced Runtime Hardening Techniques

Container Security and Sandboxing

# runtime_hardening.py - Advanced runtime hardening
import os
import sys
import resource
import signal
import tempfile
import subprocess
from typing import Dict, Any, Optional
import logging

logger = logging.getLogger()

class RuntimeHardeningManager:
    """Implement runtime hardening techniques for Lambda functions"""

    def __init__(self):
        self.security_limits = {
            'max_memory': 256 * 1024 * 1024,  # 256MB
            'max_cpu_time': 30,  # 30 seconds
            'max_open_files': 100,
            'max_processes': 10,
            'max_file_size': 10 * 1024 * 1024  # 10MB
        }

        self.restricted_modules = [
            'ctypes', 'subprocess', 'multiprocessing',
            'threading', 'socket', 'urllib', 'http'
        ]

        # Apply hardening on initialization
        self.apply_resource_limits()
        self.setup_signal_handlers()
        self.restrict_module_imports()

    def apply_resource_limits(self):
        """Apply resource limits to the runtime environment"""

        try:
            # Memory limit
            resource.setrlimit(
                resource.RLIMIT_AS,
                (self.security_limits['max_memory'], self.security_limits['max_memory'])
            )

            # CPU time limit
            resource.setrlimit(
                resource.RLIMIT_CPU,
                (self.security_limits['max_cpu_time'], self.security_limits['max_cpu_time'])
            )

            # File descriptor limit
            resource.setrlimit(
                resource.RLIMIT_NOFILE,
                (self.security_limits['max_open_files'], self.security_limits['max_open_files'])
            )

            # Process limit
            resource.setrlimit(
                resource.RLIMIT_NPROC,
                (self.security_limits['max_processes'], self.security_limits['max_processes'])
            )

            # File size limit
            resource.setrlimit(
                resource.RLIMIT_FSIZE,
                (self.security_limits['max_file_size'], self.security_limits['max_file_size'])
            )

            logger.info("Runtime resource limits applied successfully")

        except Exception as e:
            logger.error(f"Failed to apply resource limits: {e}")

    def setup_signal_handlers(self):
        """Set up signal handlers for security monitoring"""

        def security_signal_handler(signum, frame):
            """Handle security-related signals"""

            signal_names = {
                signal.SIGALRM: 'ALARM',
                signal.SIGXCPU: 'CPU_LIMIT_EXCEEDED',
                signal.SIGXFSZ: 'FILE_SIZE_LIMIT_EXCEEDED'
            }

            signal_name = signal_names.get(signum, f'SIGNAL_{signum}')

            logger.critical(f"Security signal received: {signal_name}")

            # Log security incident
            security_monitor.log_security_event(
                'RESOURCE_LIMIT_EXCEEDED',
                {
                    'signal': signal_name,
                    'frame_info': {
                        'filename': frame.f_code.co_filename,
                        'function': frame.f_code.co_name,
                        'line': frame.f_lineno
                    }
                },
                'CRITICAL'
            )

            # Terminate execution
            sys.exit(1)

        # Register signal handlers
        signal.signal(signal.SIGALRM, security_signal_handler)  # Alarm
        signal.signal(signal.SIGXCPU, security_signal_handler)  # CPU limit
        signal.signal(signal.SIGXFSZ, security_signal_handler)  # File size limit

    def restrict_module_imports(self):
        """Restrict imports of dangerous modules"""

        original_import = __builtins__['__import__']

        def restricted_import(name, globals=None, locals=None, fromlist=(), level=0):
            """Restricted import function"""

            # Check if module is restricted
            if name in self.restricted_modules:
                # Log the attempt
                security_monitor.log_security_event(
                    'RESTRICTED_MODULE_IMPORT',
                    {
                        'module': name,
                        'caller': security_monitor.get_caller_info()
                    },
                    'HIGH'
                )

                # Allow import but with warning
                logger.warning(f"Restricted module imported: {name}")

            return original_import(name, globals, locals, fromlist, level)

        # Replace the import function
        __builtins__['__import__'] = restricted_import

    def create_secure_temp_directory(self) -> str:
        """Create a secure temporary directory with restricted permissions"""

        try:
            # Create temporary directory
            temp_dir = tempfile.mkdtemp(prefix='lambda_secure_')

            # Set restrictive permissions (owner only)
            os.chmod(temp_dir, 0o700)

            logger.info(f"Secure temp directory created: {temp_dir}")
            return temp_dir

        except Exception as e:
            logger.error(f"Failed to create secure temp directory: {e}")
            raise

    def validate_file_access(self, file_path: str, operation: str) -> bool:
        """Validate file access requests"""

        # Normalize path
        normalized_path = os.path.normpath(file_path)

        # Restricted paths
        restricted_paths = [
            '/etc/', '/bin/', '/sbin/', '/usr/bin/',
            '/usr/sbin/', '/boot/', '/dev/', '/sys/'
        ]

        # Check for path traversal
        if '..' in normalized_path:
            security_monitor.log_security_event(
                'PATH_TRAVERSAL_ATTEMPT',
                {
                    'file_path': file_path,
                    'normalized_path': normalized_path,
                    'operation': operation
                },
                'HIGH'
            )
            return False

        # Check restricted paths
        for restricted in restricted_paths:
            if normalized_path.startswith(restricted):
                security_monitor.log_security_event(
                    'RESTRICTED_PATH_ACCESS',
                    {
                        'file_path': file_path,
                        'restricted_path': restricted,
                        'operation': operation
                    },
                    'HIGH'
                )
                return False

        return True

    def secure_file_open(self, file_path: str, mode: str = 'r', **kwargs):
        """Secure wrapper for file opening"""

        # Validate file access
        if not self.validate_file_access(file_path, f'open_{mode}'):
            raise PermissionError(f"Access denied to {file_path}")

        # Log file access
        security_monitor.log_security_event(
            'FILE_ACCESS',
            {
                'file_path': file_path,
                'mode': mode,
                'caller': security_monitor.get_caller_info()
            },
            'INFO'
        )

        # Use original open function
        return security_monitor.original_open(file_path, mode, **kwargs)

class RuntimeIntegrityChecker:
    """Check runtime integrity and detect tampering"""

    def __init__(self):
        # Store initial state checksums
        self.initial_checksums = self.calculate_runtime_checksums()

    def calculate_runtime_checksums(self) -> Dict[str, str]:
        """Calculate checksums of critical runtime components"""

        checksums = {}

        try:
            # Python executable checksum
            python_path = sys.executable
            if os.path.exists(python_path):
                with open(python_path, 'rb') as f:
                    content = f.read()
                    checksums['python_executable'] = hashlib.sha256(content).hexdigest()

            # Critical module checksums
            critical_modules = ['os', 'sys', 'subprocess', 'importlib']

            for module_name in critical_modules:
                if module_name in sys.modules:
                    module = sys.modules[module_name]
                    if hasattr(module, '__file__') and module.__file__:
                        try:
                            with open(module.__file__, 'rb') as f:
                                content = f.read()
                                checksums[f'module_{module_name}'] = hashlib.sha256(content).hexdigest()
                        except:
                            pass

        except Exception as e:
            logger.error(f"Failed to calculate runtime checksums: {e}")

        return checksums

    def verify_runtime_integrity(self) -> Dict[str, Any]:
        """Verify runtime integrity against initial state"""

        current_checksums = self.calculate_runtime_checksums()
        integrity_report = {
            'timestamp': time.time(),
            'integrity_violations': [],
            'new_components': [],
            'missing_components': [],
            'overall_status': 'CLEAN'
        }

        # Check for modifications
        for component, initial_checksum in self.initial_checksums.items():
            if component not in current_checksums:
                integrity_report['missing_components'].append(component)
                integrity_report['overall_status'] = 'COMPROMISED'

            elif current_checksums[component] != initial_checksum:
                integrity_report['integrity_violations'].append({
                    'component': component,
                    'initial_checksum': initial_checksum,
                    'current_checksum': current_checksums[component]
                })
                integrity_report['overall_status'] = 'COMPROMISED'

        # Check for new components
        for component in current_checksums:
            if component not in self.initial_checksums:
                integrity_report['new_components'].append(component)
                integrity_report['overall_status'] = 'MODIFIED'

        # Log integrity violations
        if integrity_report['overall_status'] != 'CLEAN':
            security_monitor.log_security_event(
                'RUNTIME_INTEGRITY_VIOLATION',
                integrity_report,
                'CRITICAL'
            )

        return integrity_report

# Global hardening manager
hardening_manager = RuntimeHardeningManager()
integrity_checker = RuntimeIntegrityChecker()

# Replace built-in open with secure version
__builtins__['open'] = hardening_manager.secure_file_open

Runtime Incident Response System

# runtime_incident_response.py - Automated incident response for runtime threats
import json
import boto3
import time
from datetime import datetime, timedelta
from typing import Dict, Any, List
from dataclasses import dataclass
from enum import Enum
import logging

logger = logging.getLogger()

class ThreatLevel(Enum):
    LOW = "low"
    MEDIUM = "medium"
    HIGH = "high"
    CRITICAL = "critical"

class ResponseAction(Enum):
    MONITOR = "monitor"
    ALERT = "alert"
    QUARANTINE = "quarantine"
    TERMINATE = "terminate"

@dataclass
class SecurityIncident:
    id: str
    timestamp: float
    threat_level: ThreatLevel
    event_type: str
    details: Dict[str, Any]
    affected_resources: List[str]
    response_actions: List[ResponseAction]

class RuntimeIncidentResponse:
    """Automated incident response system for runtime security threats"""

    def __init__(self):
        self.incident_store = boto3.resource('dynamodb').Table(
            os.environ.get('INCIDENT_TABLE', 'runtime-incidents')
        )

        self.sns_client = boto3.client('sns')
        self.lambda_client = boto3.client('lambda')

        # Response playbooks
        self.response_playbooks = {
            'CODE_INJECTION': {
                ThreatLevel.HIGH: [ResponseAction.QUARANTINE, ResponseAction.ALERT],
                ThreatLevel.CRITICAL: [ResponseAction.TERMINATE, ResponseAction.ALERT]
            },
            'COMMAND_INJECTION': {
                ThreatLevel.HIGH: [ResponseAction.QUARANTINE, ResponseAction.ALERT],
                ThreatLevel.CRITICAL: [ResponseAction.TERMINATE, ResponseAction.ALERT]
            },
            'PATH_TRAVERSAL_ATTEMPT': {
                ThreatLevel.MEDIUM: [ResponseAction.MONITOR, ResponseAction.ALERT],
                ThreatLevel.HIGH: [ResponseAction.QUARANTINE, ResponseAction.ALERT]
            },
            'RUNTIME_INTEGRITY_VIOLATION': {
                ThreatLevel.CRITICAL: [ResponseAction.TERMINATE, ResponseAction.ALERT]
            },
            'RESOURCE_LIMIT_EXCEEDED': {
                ThreatLevel.HIGH: [ResponseAction.QUARANTINE, ResponseAction.ALERT]
            }
        }

        # Incident correlation rules
        self.correlation_rules = [
            {
                'name': 'Multiple injection attempts',
                'conditions': {
                    'event_types': ['CODE_INJECTION', 'COMMAND_INJECTION'],
                    'count': 3,
                    'time_window': 300  # 5 minutes
                },
                'escalation': ThreatLevel.CRITICAL
            },
            {
                'name': 'Rapid security events',
                'conditions': {
                    'any_security_event': True,
                    'count': 10,
                    'time_window': 60  # 1 minute
                },
                'escalation': ThreatLevel.HIGH
            }
        ]

    def process_security_event(self, event_type: str, details: Dict[str, Any],
                             severity: str, context: Dict[str, Any]) -> SecurityIncident:
        """Process security event and trigger incident response"""

        # Map severity to threat level
        threat_level_mapping = {
            'LOW': ThreatLevel.LOW,
            'MEDIUM': ThreatLevel.MEDIUM,
            'HIGH': ThreatLevel.HIGH,
            'CRITICAL': ThreatLevel.CRITICAL
        }

        threat_level = threat_level_mapping.get(severity, ThreatLevel.LOW)

        # Create incident
        incident = SecurityIncident(
            id=self.generate_incident_id(event_type, context),
            timestamp=time.time(),
            threat_level=threat_level,
            event_type=event_type,
            details=details,
            affected_resources=[
                context.get('function_name', 'unknown'),
                context.get('aws_request_id', 'unknown')
            ],
            response_actions=[]
        )

        # Apply correlation rules
        incident = self.apply_correlation_rules(incident)

        # Determine response actions
        incident.response_actions = self.determine_response_actions(
            incident.event_type,
            incident.threat_level
        )

        # Execute response actions
        self.execute_response_actions(incident)

        # Store incident
        self.store_incident(incident)

        return incident

    def apply_correlation_rules(self, incident: SecurityIncident) -> SecurityIncident:
        """Apply correlation rules to detect attack patterns"""

        current_time = time.time()

        for rule in self.correlation_rules:
            conditions = rule['conditions']
            time_window = conditions.get('time_window', 300)
            required_count = conditions.get('count', 1)

            # Query recent incidents
            recent_incidents = self.get_recent_incidents(
                time_window,
                conditions.get('event_types', [incident.event_type])
            )

            if len(recent_incidents) >= required_count:
                # Escalate threat level
                escalation_level = rule['escalation']

                if escalation_level.value > incident.threat_level.value:
                    logger.warning(f"Escalating incident due to rule: {rule['name']}")

                    incident.threat_level = escalation_level
                    incident.details['escalation_rule'] = rule['name']
                    incident.details['correlated_incidents'] = [
                        i['id'] for i in recent_incidents
                    ]

        return incident

    def determine_response_actions(self, event_type: str, threat_level: ThreatLevel) -> List[ResponseAction]:
        """Determine appropriate response actions based on threat"""

        playbook = self.response_playbooks.get(event_type, {})
        actions = playbook.get(threat_level, [ResponseAction.MONITOR])

        return actions

    def execute_response_actions(self, incident: SecurityIncident):
        """Execute response actions for the incident"""

        for action in incident.response_actions:
            try:
                if action == ResponseAction.MONITOR:
                    self.action_monitor(incident)
                elif action == ResponseAction.ALERT:
                    self.action_alert(incident)
                elif action == ResponseAction.QUARANTINE:
                    self.action_quarantine(incident)
                elif action == ResponseAction.TERMINATE:
                    self.action_terminate(incident)

                logger.info(f"Executed response action: {action.value} for incident {incident.id}")

            except Exception as e:
                logger.error(f"Failed to execute action {action.value}: {e}")

    def action_monitor(self, incident: SecurityIncident):
        """Enhanced monitoring action"""

        # Increase logging verbosity
        logger.setLevel(logging.DEBUG)

        # Send monitoring alert
        self.send_notification(
            f"Security incident monitoring activated: {incident.id}",
            incident.details,
            'info'
        )

    def action_alert(self, incident: SecurityIncident):
        """Send security alerts"""

        alert_message = {
            'incident_id': incident.id,
            'threat_level': incident.threat_level.value,
            'event_type': incident.event_type,
            'timestamp': incident.timestamp,
            'affected_resources': incident.affected_resources,
            'details': incident.details
        }

        # Send to SNS topic
        topic_arn = os.environ.get('SECURITY_ALERTS_TOPIC')
        if topic_arn:
            self.sns_client.publish(
                TopicArn=topic_arn,
                Subject=f'Runtime Security Alert: {incident.event_type}',
                Message=json.dumps(alert_message, indent=2)
            )

        # Send to Slack/Teams webhook if configured
        webhook_url = os.environ.get('SECURITY_WEBHOOK_URL')
        if webhook_url:
            self.send_webhook_alert(webhook_url, alert_message)

    def action_quarantine(self, incident: SecurityIncident):
        """Quarantine affected Lambda function"""

        function_name = incident.affected_resources[0]

        try:
            # Update function configuration to disable it temporarily
            self.lambda_client.update_function_configuration(
                FunctionName=function_name,
                Environment={
                    'Variables': {
                        'QUARANTINE_MODE': 'true',
                        'QUARANTINE_REASON': incident.event_type,
                        'QUARANTINE_TIMESTAMP': str(incident.timestamp)
                    }
                }
            )

            logger.warning(f"Function {function_name} has been quarantined")

        except Exception as e:
            logger.error(f"Failed to quarantine function {function_name}: {e}")

    def action_terminate(self, incident: SecurityIncident):
        """Terminate current execution and prevent further invocations"""

        function_name = incident.affected_resources[0]

        try:
            # Set reserved concurrency to 0 to prevent new invocations
            self.lambda_client.put_reserved_concurrency(
                FunctionName=function_name,
                ReservedConcurrencyLimit=0
            )

            logger.critical(f"Function {function_name} has been terminated")

            # Terminate current execution
            os._exit(1)

        except Exception as e:
            logger.error(f"Failed to terminate function {function_name}: {e}")

    def get_recent_incidents(self, time_window: int, event_types: List[str]) -> List[Dict]:
        """Get recent incidents for correlation analysis"""

        cutoff_time = time.time() - time_window

        try:
            response = self.incident_store.scan(
                FilterExpression=boto3.dynamodb.conditions.Attr('timestamp').gt(cutoff_time),
                ProjectionExpression='id, event_type, timestamp, threat_level'
            )

            incidents = []
            for item in response.get('Items', []):
                if item['event_type'] in event_types:
                    incidents.append(item)

            return incidents

        except Exception as e:
            logger.error(f"Failed to query recent incidents: {e}")
            return []

    def store_incident(self, incident: SecurityIncident):
        """Store incident in DynamoDB for analysis"""

        try:
            item = {
                'id': incident.id,
                'timestamp': incident.timestamp,
                'threat_level': incident.threat_level.value,
                'event_type': incident.event_type,
                'details': incident.details,
                'affected_resources': incident.affected_resources,
                'response_actions': [action.value for action in incident.response_actions],
                'ttl': int(time.time() + (90 * 24 * 3600))  # 90 days retention
            }

            self.incident_store.put_item(Item=item)

        except Exception as e:
            logger.error(f"Failed to store incident: {e}")

    def generate_incident_id(self, event_type: str, context: Dict[str, Any]) -> str:
        """Generate unique incident ID"""

        timestamp = int(time.time() * 1000)
        function_name = context.get('function_name', 'unknown')
        request_id = context.get('aws_request_id', 'unknown')[:8]

        return f"{event_type}_{function_name}_{request_id}_{timestamp}"

    def send_webhook_alert(self, webhook_url: str, alert_data: Dict[str, Any]):
        """Send alert via webhook"""

        try:
            import requests

            payload = {
                "text": f"🚨 Runtime Security Alert: {alert_data['event_type']}",
                "attachments": [{
                    "color": "danger" if alert_data['threat_level'] in ['high', 'critical'] else "warning",
                    "fields": [
                        {
                            "title": "Incident ID",
                            "value": alert_data['incident_id'],
                            "short": True
                        },
                        {
                            "title": "Threat Level",
                            "value": alert_data['threat_level'].upper(),
                            "short": True
                        },
                        {
                            "title": "Affected Resources",
                            "value": ", ".join(alert_data['affected_resources']),
                            "short": False
                        }
                    ]
                }]
            }

            requests.post(webhook_url, json=payload, timeout=5)

        except Exception as e:
            logger.error(f"Failed to send webhook alert: {e}")

# Global incident response system
incident_response = RuntimeIncidentResponse()

# Integration with security monitor
def enhanced_security_event_handler(event_type: str, details: Dict[str, Any],
                                   severity: str, context: Dict[str, Any]):
    """Enhanced security event handler with incident response"""

    # Process through incident response system
    incident = incident_response.process_security_event(
        event_type, details, severity, context
    )

    logger.info(f"Security incident created: {incident.id} ({incident.threat_level.value})")

    return incident

def hardened_lambda_handler(event, context):
    """Production-ready Lambda handler with comprehensive runtime security"""

    # Check if function is quarantined
    if os.environ.get('QUARANTINE_MODE') == 'true':
        return {
            'statusCode': 503,
            'body': json.dumps({
                'error': 'Function quarantined due to security incident',
                'quarantine_reason': os.environ.get('QUARANTINE_REASON', 'unknown'),
                'quarantine_timestamp': os.environ.get('QUARANTINE_TIMESTAMP', 'unknown')
            })
        }

    # Verify runtime integrity
    integrity_report = integrity_checker.verify_runtime_integrity()
    if integrity_report['overall_status'] == 'COMPROMISED':
        enhanced_security_event_handler(
            'RUNTIME_INTEGRITY_VIOLATION',
            integrity_report,
            'CRITICAL',
            {'function_name': context.function_name, 'aws_request_id': context.aws_request_id}
        )

    try:
        # Process request with full security monitoring
        return secure_lambda_handler(event, context)

    except Exception as e:
        # Handle security exceptions
        enhanced_security_event_handler(
            'RUNTIME_EXCEPTION',
            {
                'error': str(e),
                'error_type': type(e).__name__
            },
            'HIGH',
            {'function_name': context.function_name, 'aws_request_id': context.aws_request_id}
        )

        raise

Conclusion: Securing the Serverless Runtime

Runtime security in serverless environments requires a fundamentally different approach than traditional infrastructure security. The ephemeral nature of serverless execution environments, combined with the shared responsibility model, creates unique challenges that demand innovative solutions.

Key Principles for Serverless Runtime Security:

  1. Defense in Depth: Layer multiple security controls throughout the runtime environment
  2. Real-time Monitoring: Implement continuous monitoring for anomalous behavior and threats
  3. Automated Response: Build automated incident response capabilities that can react faster than human operators
  4. Integrity Verification: Continuously verify the integrity of the runtime environment and code
  5. Least Privilege Execution: Apply strict resource limits and access controls at the runtime level

Critical Implementation Areas:

  • Runtime Hardening: Implement resource limits, module restrictions, and access controls
  • Threat Detection: Deploy real-time monitoring for injection attacks, privilege escalation, and anomalous behavior
  • Incident Response: Build automated response capabilities that can quarantine or terminate compromised functions
  • Integrity Monitoring: Continuously verify runtime integrity and detect tampering attempts
  • Security Logging: Implement comprehensive logging and alerting for all security-relevant events

The future of serverless security lies in building intelligent, self-defending runtime environments that can detect, respond to, and recover from sophisticated attacks without human intervention. As serverless computing continues to evolve, so too must our approach to securing these invisible, ephemeral execution environments.

Remember: in serverless computing, the runtime is your last line of defense. Make it count.