Django Security Arsenal: Custom Middleware for Modern Threat Protection

Django’s built-in security features are robust, but modern web applications face sophisticated threats that require advanced protection mechanisms. Custom middleware provides the perfect layer to implement security controls that operate on every request before it reaches your views.

This guide will show you how to build a comprehensive security middleware stack that protects against injection attacks, implements advanced authentication, prevents abuse, and monitors security events in real-time.

Understanding Django Middleware Security Architecture

Middleware Execution Order

Django processes middleware in a specific order during request/response cycles:

# settings.py - Security-focused middleware stack
MIDDLEWARE = [
    # 1. Security headers (first line of defense)
    'myapp.middleware.SecurityHeadersMiddleware',

    # 2. Rate limiting (prevent abuse)
    'myapp.middleware.RateLimitMiddleware',

    # 3. IP filtering (block malicious IPs)
    'myapp.middleware.IPFilterMiddleware',

    # 4. Request validation (sanitize input)
    'myapp.middleware.RequestValidationMiddleware',

    # Django's built-in security
    'django.middleware.security.SecurityMiddleware',
    'django.contrib.sessions.middleware.SessionMiddleware',
    'django.middleware.csrf.CsrfViewMiddleware',

    # 5. Custom authentication (JWT/API tokens)
    'myapp.middleware.JWTAuthenticationMiddleware',

    # Django's auth middleware
    'django.contrib.auth.middleware.AuthenticationMiddleware',

    # 6. Audit logging (security monitoring)
    'myapp.middleware.SecurityAuditMiddleware',

    # Standard Django middleware
    'django.middleware.common.CommonMiddleware',
    'django.contrib.messages.middleware.MessageMiddleware',
    'django.middleware.clickjacking.XFrameOptionsMiddleware',
]

Base Security Middleware Class

# middleware/base.py
import logging
import time
from django.http import JsonResponse, HttpResponseForbidden
from django.core.cache import cache

logger = logging.getLogger('security')

class BaseSecurityMiddleware:
    """Base class for security middleware with common utilities"""

    def __init__(self, get_response):
        self.get_response = get_response

    def __call__(self, request):
        # Pre-processing
        start_time = time.time()

        # Security check
        security_result = self.security_check(request)
        if security_result is not None:
            self.log_security_event(request, security_result)
            return security_result

        # Process request
        response = self.get_response(request)

        # Post-processing
        response = self.post_process(request, response)

        # Log timing
        processing_time = time.time() - start_time
        if processing_time > 1.0:  # Log slow requests
            logger.warning(
                f'Slow security processing: {processing_time:.2f}s '
                f'for {request.path}'
            )

        return response

    def security_check(self, request):
        """Override in subclasses for specific security checks"""
        return None

    def post_process(self, request, response):
        """Override in subclasses for response modification"""
        return response

    def log_security_event(self, request, event_data):
        """Log security events for monitoring"""
        logger.warning(
            f'Security event: {event_data.get("type", "unknown")} '
            f'IP: {self.get_client_ip(request)} '
            f'Path: {request.path} '
            f'User-Agent: {request.META.get("HTTP_USER_AGENT", "unknown")}'
        )

    def get_client_ip(self, request):
        """Get real client IP address"""
        x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
        if x_forwarded_for:
            return x_forwarded_for.split(',')[0].strip()
        return request.META.get('REMOTE_ADDR', 'unknown')

    def is_api_request(self, request):
        """Check if request is for API endpoints"""
        return (
            request.path.startswith('/api/') or
            request.content_type == 'application/json' or
            'application/json' in request.META.get('HTTP_ACCEPT', '')
        )

    def create_security_response(self, message, status_code=403):
        """Create consistent security response"""
        return JsonResponse({
            'error': 'Security violation',
            'message': message,
            'status': status_code
        }, status=status_code)

Advanced JWT Authentication Middleware

JWT Security Implementation

# middleware/jwt_auth.py
import jwt
import json
from datetime import datetime, timedelta
from django.conf import settings
from django.contrib.auth import get_user_model
from django.core.cache import cache
from django.http import JsonResponse
from .base import BaseSecurityMiddleware

User = get_user_model()

class JWTAuthenticationMiddleware(BaseSecurityMiddleware):
    """Advanced JWT authentication with security features"""

    def __init__(self, get_response):
        super().__init__(get_response)
        self.secret_key = settings.SECRET_KEY
        self.algorithm = 'HS256'
        self.token_blacklist_key = 'jwt_blacklist'

    def security_check(self, request):
        # Skip non-API requests
        if not self.is_api_request(request):
            return None

        # Extract JWT token
        token = self.extract_token(request)
        if not token:
            return self.create_security_response(
                'Authentication required',
                status_code=401
            )

        # Validate token
        try:
            payload = self.validate_token(token)
            user = self.get_user_from_payload(payload)

            if not user:
                return self.create_security_response(
                    'Invalid user',
                    status_code=401
                )

            # Security checks
            if not self.perform_security_checks(request, user, payload):
                return self.create_security_response(
                    'Security validation failed',
                    status_code=403
                )

            # Attach user to request
            request.user = user
            request.jwt_payload = payload

        except jwt.ExpiredSignatureError:
            return self.create_security_response(
                'Token expired',
                status_code=401
            )
        except jwt.InvalidTokenError as e:
            return self.create_security_response(
                f'Invalid token: {str(e)}',
                status_code=401
            )
        except Exception as e:
            logger.error(f'JWT validation error: {str(e)}')
            return self.create_security_response(
                'Authentication error',
                status_code=500
            )

        return None

    def extract_token(self, request):
        """Extract JWT token from request"""
        # Try Authorization header first
        auth_header = request.META.get('HTTP_AUTHORIZATION', '')
        if auth_header.startswith('Bearer '):
            return auth_header.split(' ')[1]

        # Try custom header
        token = request.META.get('HTTP_X_AUTH_TOKEN')
        if token:
            return token

        # Try POST data (less secure, for legacy support)
        if request.method == 'POST':
            try:
                data = json.loads(request.body)
                return data.get('token')
            except (json.JSONDecodeError, AttributeError):
                pass

        return None

    def validate_token(self, token):
        """Validate JWT token with comprehensive checks"""
        # Check blacklist first
        if self.is_token_blacklisted(token):
            raise jwt.InvalidTokenError('Token is blacklisted')

        # Decode and validate
        payload = jwt.decode(
            token,
            self.secret_key,
            algorithms=[self.algorithm],
            options={
                'verify_signature': True,
                'verify_exp': True,
                'verify_iat': True,
                'verify_nbf': True,
                'require': ['exp', 'iat', 'user_id']
            }
        )

        # Additional security validations
        self.validate_payload_security(payload)

        return payload

    def validate_payload_security(self, payload):
        """Additional security validations for JWT payload"""
        now = datetime.utcnow().timestamp()

        # Check token age (max 24 hours for access tokens)
        issued_at = payload.get('iat', 0)
        if now - issued_at > 86400:  # 24 hours
            raise jwt.InvalidTokenError('Token too old')

        # Check for required claims
        required_claims = ['user_id', 'exp', 'iat']
        for claim in required_claims:
            if claim not in payload:
                raise jwt.InvalidTokenError(f'Missing required claim: {claim}')

        # Validate token type
        token_type = payload.get('token_type', 'access')
        if token_type != 'access':
            raise jwt.InvalidTokenError('Invalid token type')

    def get_user_from_payload(self, payload):
        """Get user from JWT payload with caching"""
        user_id = payload.get('user_id')
        if not user_id:
            return None

        # Try cache first
        cache_key = f'jwt_user_{user_id}'
        user = cache.get(cache_key)

        if user is None:
            try:
                user = User.objects.get(id=user_id, is_active=True)
                # Cache for 5 minutes
                cache.set(cache_key, user, 300)
            except User.DoesNotExist:
                return None

        return user

    def perform_security_checks(self, request, user, payload):
        """Perform additional security checks"""
        client_ip = self.get_client_ip(request)

        # Check for suspicious login locations
        if not self.validate_ip_location(user, client_ip):
            logger.warning(
                f'Suspicious login location for user {user.id} '
                f'from IP {client_ip}'
            )

        # Check for concurrent session limits
        if not self.check_concurrent_sessions(user, payload):
            logger.warning(
                f'Too many concurrent sessions for user {user.id}'
            )
            return False

        # Check user permissions
        if not self.validate_user_permissions(user, request):
            return False

        # Update last activity
        self.update_user_activity(user, client_ip)

        return True

    def validate_ip_location(self, user, client_ip):
        """Validate IP location against user's typical locations"""
        # Implementation would integrate with GeoIP service
        # This is a simplified version
        cache_key = f'user_ips_{user.id}'
        known_ips = cache.get(cache_key, set())

        if client_ip not in known_ips and len(known_ips) > 0:
            # New IP detected - could trigger additional security measures
            # For now, just log it
            logger.info(f'New IP {client_ip} for user {user.id}')

        # Add IP to known IPs
        known_ips.add(client_ip)
        cache.set(cache_key, known_ips, 86400 * 7)  # 7 days

        return True

    def check_concurrent_sessions(self, user, payload):
        """Check concurrent session limits"""
        session_key = f'user_sessions_{user.id}'
        sessions = cache.get(session_key, set())

        token_id = payload.get('jti', payload.get('iat'))
        sessions.add(token_id)

        # Limit to 5 concurrent sessions
        if len(sessions) > 5:
            # Remove oldest sessions (simplified)
            sessions = set(list(sessions)[-5:])

        cache.set(session_key, sessions, 86400)
        return True

    def validate_user_permissions(self, user, request):
        """Validate user has necessary permissions"""
        # Check if user account is locked
        if hasattr(user, 'is_locked') and user.is_locked:
            return False

        # Check if user requires password change
        if hasattr(user, 'password_change_required') and user.password_change_required:
            # Allow only password change endpoints
            allowed_paths = ['/api/auth/change-password/', '/api/auth/logout/']
            if request.path not in allowed_paths:
                return False

        return True

    def update_user_activity(self, user, client_ip):
        """Update user activity tracking"""
        activity_key = f'user_activity_{user.id}'
        activity_data = {
            'last_seen': datetime.utcnow().isoformat(),
            'last_ip': client_ip,
            'request_count': cache.get(f'user_requests_{user.id}', 0) + 1
        }
        cache.set(activity_key, activity_data, 86400)
        cache.set(f'user_requests_{user.id}', activity_data['request_count'], 3600)

    def is_token_blacklisted(self, token):
        """Check if token is blacklisted"""
        blacklist = cache.get(self.token_blacklist_key, set())
        token_hash = hash(token)  # Use hash to save memory
        return token_hash in blacklist

    def blacklist_token(self, token):
        """Add token to blacklist"""
        blacklist = cache.get(self.token_blacklist_key, set())
        blacklist.add(hash(token))
        cache.set(self.token_blacklist_key, blacklist, 86400)

# Utility function for views to blacklist tokens
def blacklist_current_token(request):
    """Utility to blacklist current JWT token (for logout)"""
    if hasattr(request, 'jwt_payload'):
        from .middleware.jwt_auth import JWTAuthenticationMiddleware
        middleware = JWTAuthenticationMiddleware(None)

        # Extract token again (could be optimized by storing in request)
        token = middleware.extract_token(request)
        if token:
            middleware.blacklist_token(token)

Advanced Request Validation Middleware

Input Sanitization and XSS Prevention

# middleware/validation.py
import re
import json
import bleach
from urllib.parse import unquote
from django.http import JsonResponse
from .base import BaseSecurityMiddleware

class RequestValidationMiddleware(BaseSecurityMiddleware):
    """Advanced request validation and sanitization"""

    def __init__(self, get_response):
        super().__init__(get_response)

        # XSS patterns
        self.xss_patterns = [
            r'<script[^>]*>.*?</script>',
            r'javascript:',
            r'vbscript:',
            r'onload\s*=',
            r'onerror\s*=',
            r'onclick\s*=',
            r'onmouse\w+\s*=',
            r'<iframe[^>]*>',
            r'<object[^>]*>',
            r'<embed[^>]*>',
            r'<link[^>]*>',
            r'<meta[^>]*>',
        ]

        # SQL injection patterns
        self.sql_patterns = [
            r'(\b(ALTER|CREATE|DELETE|DROP|EXEC(UTE)?|INSERT|SELECT|UNION|UPDATE)\b)',
            r'(\b(OR|AND)\s+\d+\s*=\s*\d+)',
            r'(\b(OR|AND)\s+\w+\s*(LIKE|=)\s*\w+)',
            r'([\'"]\s*(OR|AND)\s+[\'"]\w+[\'"]\s*=\s*[\'"]\w+[\'"])',
            r'(UNION\s+SELECT)',
            r'(INFORMATION_SCHEMA|SYSOBJECTS|SYSCOLUMNS)',
        ]

        # Path traversal patterns
        self.path_traversal_patterns = [
            r'\.\./+',
            r'\.\.\\+',
            r'/etc/passwd',
            r'/etc/shadow',
            r'\.\.\..*/',
            r'%2e%2e%2f',
            r'%2e%2e\\',
        ]

        # Allowed HTML tags for content that needs basic formatting
        self.allowed_tags = [
            'p', 'br', 'strong', 'em', 'u', 'i', 'ul', 'ol', 'li',
            'h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'blockquote', 'code', 'pre'
        ]

        self.allowed_attributes = {
            '*': ['class'],
            'a': ['href', 'title'],
            'img': ['src', 'alt', 'title', 'width', 'height']
        }

    def security_check(self, request):
        # Validate request size
        if not self.validate_request_size(request):
            return self.create_security_response(
                'Request too large',
                status_code=413
            )

        # Validate URL
        if not self.validate_url(request):
            return self.create_security_response(
                'Invalid request URL',
                status_code=400
            )

        # Validate headers
        if not self.validate_headers(request):
            return self.create_security_response(
                'Invalid request headers',
                status_code=400
            )

        # Sanitize request data
        if not self.sanitize_request_data(request):
            return self.create_security_response(
                'Request contains malicious content',
                status_code=400
            )

        return None

    def validate_request_size(self, request):
        """Validate request size limits"""
        # Check content length
        content_length = request.META.get('CONTENT_LENGTH', 0)
        try:
            content_length = int(content_length)
        except (ValueError, TypeError):
            content_length = 0

        # Standard limit: 10MB, JSON API: 1MB
        max_size = 1024 * 1024  # 1MB for API requests
        if not self.is_api_request(request):
            max_size = 10 * 1024 * 1024  # 10MB for regular requests

        if content_length > max_size:
            logger.warning(
                f'Request size limit exceeded: {content_length} bytes '
                f'from IP {self.get_client_ip(request)}'
            )
            return False

        return True

    def validate_url(self, request):
        """Validate URL for malicious patterns"""
        url = unquote(request.get_full_path())

        # Check for path traversal
        for pattern in self.path_traversal_patterns:
            if re.search(pattern, url, re.IGNORECASE):
                logger.warning(
                    f'Path traversal attempt: {url} '
                    f'from IP {self.get_client_ip(request)}'
                )
                return False

        # Check for encoded attacks
        if self._check_encoded_attacks(url):
            return False

        # URL length check
        if len(url) > 2048:
            logger.warning(f'Excessively long URL: {len(url)} characters')
            return False

        return True

    def validate_headers(self, request):
        """Validate HTTP headers for security issues"""
        dangerous_headers = [
            'HTTP_X_FORWARDED_HOST',
            'HTTP_X_FORWARDED_SERVER',
            'HTTP_HOST'
        ]

        for header_key in dangerous_headers:
            header_value = request.META.get(header_key, '')
            if header_value and not self._validate_header_value(header_value):
                logger.warning(
                    f'Malicious header {header_key}: {header_value} '
                    f'from IP {self.get_client_ip(request)}'
                )
                return False

        # Check User-Agent for common attack patterns
        user_agent = request.META.get('HTTP_USER_AGENT', '')
        if self._is_malicious_user_agent(user_agent):
            return False

        return True

    def sanitize_request_data(self, request):
        """Sanitize request data"""
        try:
            # Sanitize GET parameters
            if not self._sanitize_query_params(request):
                return False

            # Sanitize POST data
            if request.method in ['POST', 'PUT', 'PATCH']:
                if not self._sanitize_post_data(request):
                    return False

        except Exception as e:
            logger.error(f'Error sanitizing request data: {str(e)}')
            return False

        return True

    def _sanitize_query_params(self, request):
        """Sanitize URL query parameters"""
        for key, value in request.GET.items():
            if not self._validate_input_string(value):
                logger.warning(
                    f'Malicious query parameter {key}: {value} '
                    f'from IP {self.get_client_ip(request)}'
                )
                return False

            # Sanitize HTML content
            if self._contains_html(value):
                sanitized = self._sanitize_html(value)
                # Replace in request.GET (this modifies the QueryDict)
                request.GET._mutable = True
                request.GET[key] = sanitized
                request.GET._mutable = False

        return True

    def _sanitize_post_data(self, request):
        """Sanitize POST data"""
        content_type = request.content_type.lower()

        if content_type == 'application/json':
            return self._sanitize_json_data(request)
        elif content_type.startswith('application/x-www-form-urlencoded'):
            return self._sanitize_form_data(request)
        elif content_type.startswith('multipart/form-data'):
            return self._sanitize_multipart_data(request)

        return True

    def _sanitize_json_data(self, request):
        """Sanitize JSON request data"""
        try:
            if hasattr(request, '_body') and request._body:
                data = json.loads(request.body)
                sanitized_data = self._sanitize_dict_recursive(data)

                # Replace request body with sanitized data
                request._body = json.dumps(sanitized_data).encode('utf-8')

        except json.JSONDecodeError:
            logger.warning(f'Invalid JSON in request from IP {self.get_client_ip(request)}')
            return False
        except Exception as e:
            logger.error(f'Error sanitizing JSON data: {str(e)}')
            return False

        return True

    def _sanitize_form_data(self, request):
        """Sanitize form-encoded data"""
        for key, value in request.POST.items():
            if not self._validate_input_string(value):
                logger.warning(
                    f'Malicious POST parameter {key}: {value} '
                    f'from IP {self.get_client_ip(request)}'
                )
                return False

            # Sanitize HTML content
            if self._contains_html(value):
                sanitized = self._sanitize_html(value)
                request.POST._mutable = True
                request.POST[key] = sanitized
                request.POST._mutable = False

        return True

    def _sanitize_multipart_data(self, request):
        """Sanitize multipart form data"""
        # Note: This is simplified - production code would need
        # more sophisticated file upload validation

        for key, value in request.POST.items():
            if isinstance(value, str):
                if not self._validate_input_string(value):
                    return False

        # Validate uploaded files
        for key, uploaded_file in request.FILES.items():
            if not self._validate_uploaded_file(uploaded_file):
                return False

        return True

    def _sanitize_dict_recursive(self, data):
        """Recursively sanitize dictionary data"""
        if isinstance(data, dict):
            return {
                key: self._sanitize_dict_recursive(value)
                for key, value in data.items()
            }
        elif isinstance(data, list):
            return [self._sanitize_dict_recursive(item) for item in data]
        elif isinstance(data, str):
            if self._contains_html(data):
                return self._sanitize_html(data)
            return data
        else:
            return data

    def _validate_input_string(self, value):
        """Validate input string for malicious patterns"""
        if not isinstance(value, str):
            return True

        value_lower = value.lower()

        # Check XSS patterns
        for pattern in self.xss_patterns:
            if re.search(pattern, value_lower, re.IGNORECASE):
                return False

        # Check SQL injection patterns
        for pattern in self.sql_patterns:
            if re.search(pattern, value_lower, re.IGNORECASE):
                return False

        # Check for null bytes
        if '\x00' in value:
            return False

        return True

    def _contains_html(self, value):
        """Check if string contains HTML"""
        return bool(re.search(r'<[^>]+>', value))

    def _sanitize_html(self, value):
        """Sanitize HTML content"""
        return bleach.clean(
            value,
            tags=self.allowed_tags,
            attributes=self.allowed_attributes,
            strip=True
        )

    def _validate_header_value(self, header_value):
        """Validate HTTP header value"""
        # Check for CRLF injection
        if '\r' in header_value or '\n' in header_value:
            return False

        # Check for basic XSS in headers
        if re.search(r'<script|javascript:', header_value, re.IGNORECASE):
            return False

        return True

    def _is_malicious_user_agent(self, user_agent):
        """Check for malicious User-Agent patterns"""
        malicious_patterns = [
            r'sqlmap',
            r'nikto',
            r'netsparker',
            r'acunetix',
            r'burpsuite',
            r'grabber',
            r'w3af',
            r'havij',
            r'<script',
            r'javascript:',
        ]

        for pattern in malicious_patterns:
            if re.search(pattern, user_agent, re.IGNORECASE):
                logger.warning(f'Malicious User-Agent detected: {user_agent}')
                return True

        return False

    def _check_encoded_attacks(self, url):
        """Check for encoded attack patterns"""
        # Common encoded attack patterns
        encoded_patterns = [
            r'%3c%73%63%72%69%70%74',  # <script
            r'%22%3e%3c%73%63%72%69%70%74',  # "><script
            r'%27%3e%3c%73%63%72%69%70%74',  # '><script
        ]

        for pattern in encoded_patterns:
            if re.search(pattern, url, re.IGNORECASE):
                logger.warning(f'Encoded attack pattern in URL: {url}')
                return True

        return False

    def _validate_uploaded_file(self, uploaded_file):
        """Validate uploaded file"""
        # Check file size (10MB limit)
        if uploaded_file.size > 10 * 1024 * 1024:
            return False

        # Check file extension
        allowed_extensions = [
            '.jpg', '.jpeg', '.png', '.gif', '.pdf', '.doc', '.docx',
            '.xls', '.xlsx', '.txt', '.csv', '.zip'
        ]

        filename = uploaded_file.name.lower()
        if not any(filename.endswith(ext) for ext in allowed_extensions):
            logger.warning(f'Disallowed file extension: {filename}')
            return False

        # Check for malicious filenames
        if re.search(r'[<>:"/\\|?*]', uploaded_file.name):
            return False

        return True

    def post_process(self, request, response):
        """Add security headers to response"""
        # Add security headers
        security_headers = {
            'X-Content-Type-Options': 'nosniff',
            'X-Frame-Options': 'DENY',
            'X-XSS-Protection': '1; mode=block',
            'Referrer-Policy': 'strict-origin-when-cross-origin',
            'Content-Security-Policy': self._get_csp_header(request),
        }

        for header, value in security_headers.items():
            if header not in response:
                response[header] = value

        return response

    def _get_csp_header(self, request):
        """Generate Content Security Policy header"""
        # Customize based on your application needs
        csp_policies = [
            "default-src 'self'",
            "script-src 'self' 'unsafe-inline'",  # Adjust as needed
            "style-src 'self' 'unsafe-inline'",
            "img-src 'self' data: https:",
            "font-src 'self'",
            "connect-src 'self'",
            "frame-ancestors 'none'",
            "base-uri 'self'",
            "form-action 'self'"
        ]

        return '; '.join(csp_policies)

Rate Limiting and DDoS Protection

Intelligent Rate Limiting Middleware

# middleware/rate_limit.py
import time
import hashlib
from collections import defaultdict
from django.core.cache import cache
from django.http import HttpResponse
from .base import BaseSecurityMiddleware

class RateLimitMiddleware(BaseSecurityMiddleware):
    """Advanced rate limiting with multiple algorithms"""

    def __init__(self, get_response):
        super().__init__(get_response)

        # Rate limit configurations
        self.rate_limits = {
            # Per-IP limits
            'ip': {
                'requests': 100,
                'window': 60,  # 60 seconds
                'burst': 20    # Burst allowance
            },
            # Per-user limits (authenticated users)
            'user': {
                'requests': 500,
                'window': 60,
                'burst': 50
            },
            # Per-endpoint limits
            'endpoint': {
                '/api/auth/login/': {'requests': 5, 'window': 60},
                '/api/auth/register/': {'requests': 3, 'window': 300},
                '/api/password/reset/': {'requests': 2, 'window': 300},
            }
        }

        # Sliding window cache
        self.sliding_windows = defaultdict(list)

    def security_check(self, request):
        client_ip = self.get_client_ip(request)

        # Check IP-based rate limit
        if not self._check_ip_rate_limit(client_ip, request):
            return self._create_rate_limit_response('IP rate limit exceeded')

        # Check user-based rate limit (if authenticated)
        if hasattr(request, 'user') and request.user.is_authenticated:
            if not self._check_user_rate_limit(request.user.id, request):
                return self._create_rate_limit_response('User rate limit exceeded')

        # Check endpoint-specific rate limits
        if not self._check_endpoint_rate_limit(request.path, client_ip):
            return self._create_rate_limit_response('Endpoint rate limit exceeded')

        # Check for suspicious patterns
        if self._detect_attack_pattern(client_ip, request):
            return self._create_rate_limit_response('Suspicious activity detected')

        return None

    def _check_ip_rate_limit(self, client_ip, request):
        """Check IP-based rate limit using token bucket algorithm"""
        cache_key = f'rate_limit_ip_{client_ip}'
        bucket_data = cache.get(cache_key, {
            'tokens': self.rate_limits['ip']['requests'],
            'last_update': time.time()
        })

        current_time = time.time()
        time_passed = current_time - bucket_data['last_update']

        # Refill tokens based on time passed
        refill_rate = self.rate_limits['ip']['requests'] / self.rate_limits['ip']['window']
        tokens_to_add = time_passed * refill_rate
        bucket_data['tokens'] = min(
            self.rate_limits['ip']['requests'],
            bucket_data['tokens'] + tokens_to_add
        )
        bucket_data['last_update'] = current_time

        # Check if request can be allowed
        if bucket_data['tokens'] >= 1:
            bucket_data['tokens'] -= 1
            cache.set(cache_key, bucket_data, self.rate_limits['ip']['window'] * 2)
            return True
        else:
            # Log rate limit violation
            logger.warning(
                f'IP rate limit exceeded: {client_ip} '
                f'Path: {request.path}'
            )
            cache.set(cache_key, bucket_data, self.rate_limits['ip']['window'] * 2)
            return False

    def _check_user_rate_limit(self, user_id, request):
        """Check user-based rate limit using sliding window"""
        cache_key = f'rate_limit_user_{user_id}'
        window_data = cache.get(cache_key, [])

        current_time = time.time()
        window_start = current_time - self.rate_limits['user']['window']

        # Remove old entries
        window_data = [timestamp for timestamp in window_data if timestamp > window_start]

        # Check if limit exceeded
        if len(window_data) >= self.rate_limits['user']['requests']:
            logger.warning(
                f'User rate limit exceeded: {user_id} '
                f'Path: {request.path}'
            )
            return False

        # Add current request
        window_data.append(current_time)
        cache.set(cache_key, window_data, self.rate_limits['user']['window'])

        return True

    def _check_endpoint_rate_limit(self, path, client_ip):
        """Check endpoint-specific rate limits"""
        if path not in self.rate_limits['endpoint']:
            return True

        endpoint_config = self.rate_limits['endpoint'][path]
        cache_key = f'rate_limit_endpoint_{hashlib.md5(f"{path}_{client_ip}".encode()).hexdigest()}'

        window_data = cache.get(cache_key, [])
        current_time = time.time()
        window_start = current_time - endpoint_config['window']

        # Clean old entries
        window_data = [timestamp for timestamp in window_data if timestamp > window_start]

        # Check limit
        if len(window_data) >= endpoint_config['requests']:
            logger.warning(
                f'Endpoint rate limit exceeded: {path} '
                f'IP: {client_ip}'
            )
            return False

        # Add current request
        window_data.append(current_time)
        cache.set(cache_key, window_data, endpoint_config['window'])

        return True

    def _detect_attack_pattern(self, client_ip, request):
        """Detect potential attack patterns"""
        # Track request patterns
        pattern_key = f'attack_pattern_{client_ip}'
        patterns = cache.get(pattern_key, {
            'rapid_requests': 0,
            'error_requests': 0,
            'last_request_time': time.time(),
            'paths': []
        })

        current_time = time.time()

        # Check for rapid-fire requests (potential DoS)
        if current_time - patterns['last_request_time'] < 0.1:  # Less than 100ms
            patterns['rapid_requests'] += 1
        else:
            patterns['rapid_requests'] = max(0, patterns['rapid_requests'] - 1)

        # Check for path scanning
        patterns['paths'].append(request.path)
        if len(patterns['paths']) > 20:
            patterns['paths'] = patterns['paths'][-20:]  # Keep last 20

        # Detect scanning behavior (many different paths)
        unique_paths = len(set(patterns['paths']))
        if unique_paths > 15:  # More than 15 different paths recently
            logger.warning(
                f'Potential path scanning detected: {client_ip} '
                f'Unique paths: {unique_paths}'
            )
            cache.set(pattern_key, patterns, 300)
            return True

        # Detect rapid requests
        if patterns['rapid_requests'] > 10:
            logger.warning(
                f'Rapid request pattern detected: {client_ip} '
                f'Rapid requests: {patterns["rapid_requests"]}'
            )
            cache.set(pattern_key, patterns, 300)
            return True

        patterns['last_request_time'] = current_time
        cache.set(pattern_key, patterns, 300)
        return False

    def _create_rate_limit_response(self, message):
        """Create rate limit response with proper headers"""
        response = HttpResponse(
            content=f'Rate limit exceeded: {message}',
            status=429,
            content_type='text/plain'
        )

        # Add rate limit headers
        response['Retry-After'] = str(self.rate_limits['ip']['window'])
        response['X-RateLimit-Limit'] = str(self.rate_limits['ip']['requests'])
        response['X-RateLimit-Window'] = str(self.rate_limits['ip']['window'])

        return response

# Rate limit decorator for views
def rate_limit(requests_per_minute=60):
    """Decorator for additional view-level rate limiting"""
    def decorator(view_func):
        def wrapper(request, *args, **kwargs):
            client_ip = request.META.get('REMOTE_ADDR', 'unknown')
            cache_key = f'view_rate_limit_{view_func.__name__}_{client_ip}'

            # Simple counter-based rate limiting
            current_count = cache.get(cache_key, 0)
            if current_count >= requests_per_minute:
                return HttpResponse(
                    'Rate limit exceeded for this endpoint',
                    status=429
                )

            cache.set(cache_key, current_count + 1, 60)  # 1 minute window
            return view_func(request, *args, **kwargs)

        return wrapper
    return decorator

IP Filtering and Geoblocking

Advanced IP Security Middleware

# middleware/ip_filter.py
import ipaddress
import requests
from django.conf import settings
from django.core.cache import cache
from django.http import HttpResponseForbidden
from .base import BaseSecurityMiddleware

class IPFilterMiddleware(BaseSecurityMiddleware):
    """IP filtering with geolocation and threat intelligence"""

    def __init__(self, get_response):
        super().__init__(get_response)

        # IP whitelist (always allowed)
        self.whitelist = self._parse_ip_list(getattr(settings, 'IP_WHITELIST', []))

        # IP blacklist (always blocked)
        self.blacklist = self._parse_ip_list(getattr(settings, 'IP_BLACKLIST', []))

        # Blocked countries (ISO country codes)
        self.blocked_countries = getattr(settings, 'BLOCKED_COUNTRIES', [])

        # GeoIP service (configure your preferred service)
        self.geoip_service = getattr(settings, 'GEOIP_SERVICE', None)

    def security_check(self, request):
        client_ip = self.get_client_ip(request)

        # Skip checks for internal IPs
        if self._is_internal_ip(client_ip):
            return None

        # Check whitelist first
        if self._is_whitelisted(client_ip):
            return None

        # Check blacklist
        if self._is_blacklisted(client_ip):
            logger.warning(f'Blocked IP from blacklist: {client_ip}')
            return self._create_blocked_response('IP address is blocked')

        # Check threat intelligence
        if self._is_threat_ip(client_ip):
            logger.warning(f'Blocked threat IP: {client_ip}')
            return self._create_blocked_response('IP address flagged as threat')

        # Check geolocation
        if not self._check_geo_restrictions(client_ip):
            logger.warning(f'Blocked IP from restricted country: {client_ip}')
            return self._create_blocked_response('Access from your location is restricted')

        # Check for Tor/Proxy
        if self._is_tor_or_proxy(client_ip):
            logger.info(f'Tor/Proxy access detected: {client_ip}')
            # Decide based on your policy - block, challenge, or allow
            # For now, we'll allow but log it

        return None

    def _parse_ip_list(self, ip_list):
        """Parse IP addresses and CIDR ranges"""
        parsed_ips = []
        for ip_str in ip_list:
            try:
                if '/' in ip_str:
                    # CIDR range
                    parsed_ips.append(ipaddress.ip_network(ip_str, strict=False))
                else:
                    # Single IP
                    parsed_ips.append(ipaddress.ip_address(ip_str))
            except ValueError as e:
                logger.error(f'Invalid IP in configuration: {ip_str} - {e}')
        return parsed_ips

    def _is_internal_ip(self, ip_str):
        """Check if IP is internal/private"""
        try:
            ip = ipaddress.ip_address(ip_str)
            return ip.is_private or ip.is_loopback
        except ValueError:
            return False

    def _is_whitelisted(self, ip_str):
        """Check if IP is whitelisted"""
        try:
            ip = ipaddress.ip_address(ip_str)
            for allowed in self.whitelist:
                if isinstance(allowed, ipaddress.IPv4Address) or isinstance(allowed, ipaddress.IPv6Address):
                    if ip == allowed:
                        return True
                elif isinstance(allowed, ipaddress.IPv4Network) or isinstance(allowed, ipaddress.IPv6Network):
                    if ip in allowed:
                        return True
        except ValueError:
            pass
        return False

    def _is_blacklisted(self, ip_str):
        """Check if IP is blacklisted"""
        try:
            ip = ipaddress.ip_address(ip_str)
            for blocked in self.blacklist:
                if isinstance(blocked, ipaddress.IPv4Address) or isinstance(blocked, ipaddress.IPv6Address):
                    if ip == blocked:
                        return True
                elif isinstance(blocked, ipaddress.IPv4Network) or isinstance(blocked, ipaddress.IPv6Network):
                    if ip in blocked:
                        return True
        except ValueError:
            pass
        return False

    def _is_threat_ip(self, ip_str):
        """Check IP against threat intelligence feeds"""
        cache_key = f'threat_ip_{ip_str}'
        cached_result = cache.get(cache_key)

        if cached_result is not None:
            return cached_result == 'threat'

        # Check multiple threat intelligence sources
        is_threat = False

        # Example: Check against AbuseIPDB (requires API key)
        if hasattr(settings, 'ABUSEIPDB_API_KEY'):
            is_threat = self._check_abuseipdb(ip_str)

        # Cache result for 1 hour
        cache.set(cache_key, 'threat' if is_threat else 'clean', 3600)

        return is_threat

    def _check_abuseipdb(self, ip_str):
        """Check IP against AbuseIPDB"""
        try:
            url = 'https://api.abuseipdb.com/api/v2/check'
            headers = {
                'Key': settings.ABUSEIPDB_API_KEY,
                'Accept': 'application/json'
            }
            params = {
                'ipAddress': ip_str,
                'maxAgeInDays': 90,
                'verbose': ''
            }

            response = requests.get(url, headers=headers, params=params, timeout=5)
            if response.status_code == 200:
                data = response.json()
                abuse_confidence = data.get('data', {}).get('abuseConfidencePercentage', 0)

                # Consider IPs with >25% abuse confidence as threats
                return abuse_confidence > 25

        except requests.RequestException as e:
            logger.error(f'Error checking AbuseIPDB: {e}')

        return False

    def _check_geo_restrictions(self, ip_str):
        """Check IP against geographic restrictions"""
        if not self.blocked_countries:
            return True

        cache_key = f'geoip_{ip_str}'
        cached_country = cache.get(cache_key)

        if cached_country is None:
            cached_country = self._get_country_code(ip_str)
            # Cache for 24 hours
            cache.set(cache_key, cached_country or 'unknown', 86400)

        return cached_country not in self.blocked_countries

    def _get_country_code(self, ip_str):
        """Get country code for IP address"""
        # Example using ipapi.co (free tier available)
        try:
            response = requests.get(
                f'https://ipapi.co/{ip_str}/country/',
                timeout=5
            )
            if response.status_code == 200:
                return response.text.strip()
        except requests.RequestException as e:
            logger.error(f'Error getting country code: {e}')

        return None

    def _is_tor_or_proxy(self, ip_str):
        """Check if IP is from Tor network or known proxy"""
        cache_key = f'tor_proxy_{ip_str}'
        cached_result = cache.get(cache_key)

        if cached_result is not None:
            return cached_result == 'proxy'

        is_proxy = False

        # Check Tor exit nodes (example using public list)
        if self._check_tor_exit_nodes(ip_str):
            is_proxy = True

        # Check known proxy services
        if self._check_proxy_services(ip_str):
            is_proxy = True

        # Cache result for 6 hours
        cache.set(cache_key, 'proxy' if is_proxy else 'direct', 21600)

        return is_proxy

    def _check_tor_exit_nodes(self, ip_str):
        """Check against Tor exit node list"""
        # This would integrate with Tor's exit node list
        # For simplicity, returning False here
        return False

    def _check_proxy_services(self, ip_str):
        """Check against known proxy/VPN services"""
        # This would integrate with proxy detection services
        # For simplicity, returning False here
        return False

    def _create_blocked_response(self, message):
        """Create blocked IP response"""
        return HttpResponseForbidden(
            f'Access Denied: {message}',
            content_type='text/plain'
        )

Security Monitoring and Audit Logging

Comprehensive Security Audit Middleware

# middleware/security_audit.py
import json
import uuid
from datetime import datetime
from django.conf import settings
from django.core.serializers.json import DjangoJSONEncoder
from .base import BaseSecurityMiddleware

class SecurityAuditMiddleware(BaseSecurityMiddleware):
    """Comprehensive security event logging and monitoring"""

    def __init__(self, get_response):
        super().__init__(get_response)
        self.audit_enabled = getattr(settings, 'SECURITY_AUDIT_ENABLED', True)
        self.sensitive_fields = getattr(settings, 'AUDIT_SENSITIVE_FIELDS', [
            'password', 'token', 'secret', 'key', 'auth', 'session'
        ])

    def __call__(self, request):
        if not self.audit_enabled:
            return self.get_response(request)

        # Generate request ID for tracking
        request_id = str(uuid.uuid4())
        request.security_audit_id = request_id

        # Log request
        self._log_request(request, request_id)

        # Process request
        response = self.get_response(request)

        # Log response
        self._log_response(request, response, request_id)

        return response

    def _log_request(self, request, request_id):
        """Log security-relevant request data"""
        audit_data = {
            'event_type': 'http_request',
            'request_id': request_id,
            'timestamp': datetime.utcnow().isoformat(),
            'client_ip': self.get_client_ip(request),
            'user_id': getattr(request.user, 'id', None) if hasattr(request, 'user') else None,
            'user_agent': request.META.get('HTTP_USER_AGENT', ''),
            'method': request.method,
            'path': request.path,
            'query_params': self._sanitize_data(dict(request.GET)),
            'content_type': request.content_type,
            'content_length': request.META.get('CONTENT_LENGTH', 0),
            'referer': request.META.get('HTTP_REFERER', ''),
            'session_key': request.session.session_key if hasattr(request, 'session') else None,
        }

        # Add POST data for certain content types (sanitized)
        if request.method in ['POST', 'PUT', 'PATCH']:
            if request.content_type == 'application/json':
                try:
                    audit_data['post_data'] = self._sanitize_data(
                        json.loads(request.body)
                    )
                except (json.JSONDecodeError, UnicodeDecodeError):
                    audit_data['post_data'] = '<invalid_json>'
            elif request.content_type.startswith('application/x-www-form-urlencoded'):
                audit_data['post_data'] = self._sanitize_data(dict(request.POST))

        # Add security-relevant headers
        security_headers = [
            'HTTP_X_FORWARDED_FOR', 'HTTP_X_REAL_IP', 'HTTP_X_FORWARDED_PROTO',
            'HTTP_AUTHORIZATION', 'HTTP_X_REQUESTED_WITH', 'HTTP_ORIGIN',
            'HTTP_SEC_FETCH_SITE', 'HTTP_SEC_FETCH_MODE', 'HTTP_SEC_FETCH_DEST'
        ]

        headers = {}
        for header in security_headers:
            value = request.META.get(header)
            if value:
                # Sanitize authorization headers
                if 'AUTHORIZATION' in header:
                    headers[header] = self._mask_auth_header(value)
                else:
                    headers[header] = value

        audit_data['headers'] = headers

        self._write_audit_log(audit_data)

    def _log_response(self, request, response, request_id):
        """Log security-relevant response data"""
        audit_data = {
            'event_type': 'http_response',
            'request_id': request_id,
            'timestamp': datetime.utcnow().isoformat(),
            'status_code': response.status_code,
            'content_type': response.get('Content-Type', ''),
            'content_length': len(response.content) if hasattr(response, 'content') else 0,
        }

        # Log security headers
        security_headers = [
            'Content-Security-Policy', 'X-Frame-Options', 'X-Content-Type-Options',
            'X-XSS-Protection', 'Strict-Transport-Security', 'Set-Cookie'
        ]

        response_headers = {}
        for header in security_headers:
            value = response.get(header)
            if value:
                if header == 'Set-Cookie':
                    response_headers[header] = self._mask_cookie_data(value)
                else:
                    response_headers[header] = value

        audit_data['headers'] = response_headers

        # Log errors and security violations
        if response.status_code >= 400:
            audit_data['error_response'] = True

            # Additional context for security-related status codes
            if response.status_code == 401:
                audit_data['security_event'] = 'authentication_failure'
            elif response.status_code == 403:
                audit_data['security_event'] = 'authorization_failure'
            elif response.status_code == 429:
                audit_data['security_event'] = 'rate_limit_exceeded'

        self._write_audit_log(audit_data)

    def _sanitize_data(self, data):
        """Sanitize sensitive data for logging"""
        if isinstance(data, dict):
            sanitized = {}
            for key, value in data.items():
                if self._is_sensitive_field(key):
                    sanitized[key] = '<redacted>'
                elif isinstance(value, (dict, list)):
                    sanitized[key] = self._sanitize_data(value)
                else:
                    sanitized[key] = value
            return sanitized
        elif isinstance(data, list):
            return [self._sanitize_data(item) for item in data]
        else:
            return data

    def _is_sensitive_field(self, field_name):
        """Check if field name indicates sensitive data"""
        field_lower = field_name.lower()
        return any(sensitive in field_lower for sensitive in self.sensitive_fields)

    def _mask_auth_header(self, auth_header):
        """Mask authorization header for logging"""
        parts = auth_header.split(' ', 1)
        if len(parts) == 2:
            auth_type, credentials = parts
            # Show auth type but mask credentials
            return f"{auth_type} <redacted>"
        return "<redacted>"

    def _mask_cookie_data(self, cookie_value):
        """Mask cookie data for logging"""
        # Simple masking - show cookie names but hide values
        cookies = []
        for cookie_part in cookie_value.split(';'):
            if '=' in cookie_part:
                name, _ = cookie_part.strip().split('=', 1)
                cookies.append(f"{name}=<redacted>")
            else:
                cookies.append(cookie_part.strip())
        return '; '.join(cookies)

    def _write_audit_log(self, audit_data):
        """Write audit data to configured logging system"""
        # Log to Django logger
        logger.info(
            json.dumps(audit_data, cls=DjangoJSONEncoder),
            extra={'audit_data': audit_data}
        )

        # Optional: Send to external logging service
        if hasattr(settings, 'AUDIT_WEBHOOK_URL'):
            self._send_to_webhook(audit_data)

        # Optional: Store in database for complex queries
        if getattr(settings, 'AUDIT_STORE_IN_DB', False):
            self._store_in_database(audit_data)

    def _send_to_webhook(self, audit_data):
        """Send audit data to external webhook"""
        try:
            import requests
            requests.post(
                settings.AUDIT_WEBHOOK_URL,
                json=audit_data,
                timeout=5,
                headers={'Content-Type': 'application/json'}
            )
        except Exception as e:
            logger.error(f'Failed to send audit data to webhook: {e}')

    def _store_in_database(self, audit_data):
        """Store audit data in database"""
        try:
            from .models import SecurityAuditLog
            SecurityAuditLog.objects.create(
                event_type=audit_data['event_type'],
                data=audit_data,
                timestamp=datetime.fromisoformat(audit_data['timestamp']),
                client_ip=audit_data.get('client_ip'),
                user_id=audit_data.get('user_id'),
                request_id=audit_data.get('request_id')
            )
        except Exception as e:
            logger.error(f'Failed to store audit data in database: {e}')

# Models for database storage
# models.py
from django.db import models
from django.contrib.auth.models import User

class SecurityAuditLog(models.Model):
    EVENT_TYPES = [
        ('http_request', 'HTTP Request'),
        ('http_response', 'HTTP Response'),
        ('security_violation', 'Security Violation'),
        ('authentication', 'Authentication Event'),
    ]

    event_type = models.CharField(max_length=50, choices=EVENT_TYPES, db_index=True)
    timestamp = models.DateTimeField(db_index=True)
    request_id = models.UUIDField(db_index=True)
    client_ip = models.GenericIPAddressField(db_index=True)
    user = models.ForeignKey(User, on_delete=models.SET_NULL, null=True, blank=True)
    data = models.JSONField()

    class Meta:
        ordering = ['-timestamp']
        indexes = [
            models.Index(fields=['event_type', '-timestamp']),
            models.Index(fields=['client_ip', '-timestamp']),
            models.Index(fields=['user', '-timestamp']),
        ]

# Security monitoring views
# views.py
from django.http import JsonResponse
from django.db.models import Count, Q
from django.utils import timezone
from datetime import timedelta

def security_dashboard(request):
    """API endpoint for security monitoring dashboard"""
    if not request.user.is_staff:
        return JsonResponse({'error': 'Unauthorized'}, status=403)

    # Get metrics for last 24 hours
    since = timezone.now() - timedelta(hours=24)

    metrics = {
        'total_requests': SecurityAuditLog.objects.filter(
            event_type='http_request',
            timestamp__gte=since
        ).count(),

        'error_responses': SecurityAuditLog.objects.filter(
            event_type='http_response',
            timestamp__gte=since,
            data__status_code__gte=400
        ).count(),

        'unique_ips': SecurityAuditLog.objects.filter(
            timestamp__gte=since
        ).values('client_ip').distinct().count(),

        'top_error_paths': list(SecurityAuditLog.objects.filter(
            event_type='http_response',
            timestamp__gte=since,
            data__status_code__gte=400
        ).extra(
            select={'path': "data->>'path'"}
        ).values('path').annotate(
            count=Count('path')
        ).order_by('-count')[:10]),

        'suspicious_ips': list(SecurityAuditLog.objects.filter(
            timestamp__gte=since
        ).extra(
            select={'ip': 'client_ip'}
        ).values('ip').annotate(
            error_count=Count('id', filter=Q(
                event_type='http_response',
                data__status_code__gte=400
            ))
        ).filter(error_count__gt=10).order_by('-error_count')[:10])
    }

    return JsonResponse(metrics)

Integration and Configuration

Complete Settings Configuration

# settings.py - Security configuration
import os

# Security middleware configuration
MIDDLEWARE = [
    # Security middleware stack (order matters!)
    'myapp.middleware.SecurityHeadersMiddleware',
    'myapp.middleware.RateLimitMiddleware',
    'myapp.middleware.IPFilterMiddleware',
    'myapp.middleware.RequestValidationMiddleware',
    'django.middleware.security.SecurityMiddleware',
    'django.contrib.sessions.middleware.SessionMiddleware',
    'django.middleware.csrf.CsrfViewMiddleware',
    'myapp.middleware.JWTAuthenticationMiddleware',
    'django.contrib.auth.middleware.AuthenticationMiddleware',
    'myapp.middleware.SecurityAuditMiddleware',
    'django.middleware.common.CommonMiddleware',
    'django.contrib.messages.middleware.MessageMiddleware',
    'django.middleware.clickjacking.XFrameOptionsMiddleware',
]

# IP filtering settings
IP_WHITELIST = [
    '127.0.0.1',
    '10.0.0.0/8',
    '192.168.0.0/16',
]

IP_BLACKLIST = [
    # Add known malicious IPs
]

BLOCKED_COUNTRIES = [
    # Add country codes to block
]

# Threat intelligence API keys
ABUSEIPDB_API_KEY = os.environ.get('ABUSEIPDB_API_KEY')

# Security audit settings
SECURITY_AUDIT_ENABLED = True
AUDIT_WEBHOOK_URL = os.environ.get('AUDIT_WEBHOOK_URL')
AUDIT_STORE_IN_DB = True
AUDIT_SENSITIVE_FIELDS = [
    'password', 'token', 'secret', 'key', 'auth', 'session',
    'credit_card', 'ssn', 'social_security'
]

# Django security settings
SECURE_SSL_REDIRECT = True
SECURE_HSTS_SECONDS = 31536000  # 1 year
SECURE_HSTS_INCLUDE_SUBDOMAINS = True
SECURE_HSTS_PRELOAD = True
SECURE_CONTENT_TYPE_NOSNIFF = True
SECURE_BROWSER_XSS_FILTER = True
X_FRAME_OPTIONS = 'DENY'

# Session security
SESSION_COOKIE_SECURE = True
SESSION_COOKIE_HTTPONLY = True
SESSION_COOKIE_SAMESITE = 'Strict'
SESSION_COOKIE_AGE = 3600  # 1 hour

CSRF_COOKIE_SECURE = True
CSRF_COOKIE_HTTPONLY = True
CSRF_COOKIE_SAMESITE = 'Strict'

# Content Security Policy
CSP_DEFAULT_SRC = ("'self'",)
CSP_SCRIPT_SRC = ("'self'", "'unsafe-inline'")  # Adjust as needed
CSP_STYLE_SRC = ("'self'", "'unsafe-inline'")
CSP_IMG_SRC = ("'self'", "data:", "https:")
CSP_FONT_SRC = ("'self'",)
CSP_CONNECT_SRC = ("'self'",)
CSP_FRAME_ANCESTORS = ("'none'",)
CSP_BASE_URI = ("'self'",)
CSP_FORM_ACTION = ("'self'",)

# Logging configuration
LOGGING = {
    'version': 1,
    'disable_existing_loggers': False,
    'formatters': {
        'security': {
            'format': '[SECURITY] {levelname} {asctime} {name} {process:d} {thread:d} {message}',
            'style': '{',
        },
        'audit': {
            'format': '[AUDIT] {asctime} {message}',
            'style': '{',
        },
    },
    'handlers': {
        'security_file': {
            'level': 'INFO',
            'class': 'logging.handlers.RotatingFileHandler',
            'filename': 'logs/security.log',
            'maxBytes': 10485760,  # 10MB
            'backupCount': 5,
            'formatter': 'security',
        },
        'audit_file': {
            'level': 'INFO',
            'class': 'logging.handlers.RotatingFileHandler',
            'filename': 'logs/audit.log',
            'maxBytes': 52428800,  # 50MB
            'backupCount': 10,
            'formatter': 'audit',
        },
    },
    'loggers': {
        'security': {
            'handlers': ['security_file'],
            'level': 'INFO',
            'propagate': False,
        },
        'audit': {
            'handlers': ['audit_file'],
            'level': 'INFO',
            'propagate': False,
        },
    },
}

Testing Your Security Middleware

Unit Tests

# tests/test_security_middleware.py
from django.test import TestCase, RequestFactory
from django.contrib.auth.models import User
from unittest.mock import patch, Mock
from myapp.middleware.jwt_auth import JWTAuthenticationMiddleware
from myapp.middleware.rate_limit import RateLimitMiddleware

class SecurityMiddlewareTests(TestCase):
    def setUp(self):
        self.factory = RequestFactory()
        self.user = User.objects.create_user(
            username='testuser',
            password='testpass123'
        )

    def test_jwt_authentication_success(self):
        """Test successful JWT authentication"""
        middleware = JWTAuthenticationMiddleware(lambda r: Mock())

        # Create valid JWT token
        import jwt
        from django.conf import settings
        payload = {
            'user_id': self.user.id,
            'exp': datetime.utcnow() + timedelta(hours=1),
            'iat': datetime.utcnow(),
            'token_type': 'access'
        }
        token = jwt.encode(payload, settings.SECRET_KEY, algorithm='HS256')

        request = self.factory.get(
            '/api/test/',
            HTTP_AUTHORIZATION=f'Bearer {token}'
        )

        result = middleware.security_check(request)
        self.assertIsNone(result)  # Should pass security check
        self.assertEqual(request.user, self.user)

    def test_jwt_authentication_failure(self):
        """Test JWT authentication failure"""
        middleware = JWTAuthenticationMiddleware(lambda r: Mock())

        request = self.factory.get('/api/test/')  # No token

        result = middleware.security_check(request)
        self.assertIsNotNone(result)
        self.assertEqual(result.status_code, 401)

    def test_rate_limiting(self):
        """Test rate limiting functionality"""
        middleware = RateLimitMiddleware(lambda r: Mock())

        # First request should pass
        request1 = self.factory.get('/', REMOTE_ADDR='192.168.1.1')
        result1 = middleware.security_check(request1)
        self.assertIsNone(result1)

    @patch('myapp.middleware.validation.logger')
    def test_xss_detection(self, mock_logger):
        """Test XSS pattern detection"""
        from myapp.middleware.validation import RequestValidationMiddleware

        middleware = RequestValidationMiddleware(lambda r: Mock())

        request = self.factory.get('/?search=<script>alert("xss")</script>')
        result = middleware.security_check(request)

        self.assertIsNotNone(result)
        self.assertEqual(result.status_code, 400)
        mock_logger.warning.assert_called()

    def test_ip_filtering(self):
        """Test IP filtering"""
        from myapp.middleware.ip_filter import IPFilterMiddleware

        middleware = IPFilterMiddleware(lambda r: Mock())

        # Test with blocked IP
        request = self.factory.get('/', REMOTE_ADDR='192.168.1.100')

        with patch.object(middleware, '_is_blacklisted', return_value=True):
            result = middleware.security_check(request)
            self.assertIsNotNone(result)
            self.assertEqual(result.status_code, 403)

Conclusion

This comprehensive Django security arsenal provides multiple layers of protection:

  1. Authentication Security - Advanced JWT handling with blacklisting and session management
  2. Input Validation - XSS, SQL injection, and path traversal protection
  3. Rate Limiting - Multi-algorithm rate limiting with attack pattern detection
  4. IP Security - Geographic and threat intelligence-based filtering
  5. Audit Logging - Comprehensive security event monitoring

Key Benefits:

  • Defense in Depth - Multiple security layers working together
  • Real-time Monitoring - Immediate threat detection and response
  • Scalable Architecture - Designed for high-traffic applications
  • Compliance Ready - Comprehensive audit trails for compliance requirements
  • Customizable - Easily adaptable to specific security requirements

Best Practices for Implementation:

  1. Gradual Rollout - Implement middleware incrementally
  2. Monitoring First - Start with audit logging before enforcement
  3. Performance Testing - Verify security measures don’t impact performance
  4. Regular Updates - Keep threat intelligence and security rules current
  5. Team Training - Ensure your team understands the security architecture

Remember: Security is not a one-time implementation but an ongoing process. Regularly review your security posture, update threat patterns, and adapt to new attack vectors. The middleware architecture presented here provides a solid foundation that can evolve with your security needs.

Modern web applications face sophisticated threats, but with the right middleware stack, you can build robust defenses that protect your users and data while maintaining the performance and usability your applications demand.