Django Security Arsenal: Custom Middleware for Modern Threat Protection
Build advanced Django security with custom middleware for JWT authentication, XSS protection, rate limiting, and defending against modern attack vectors
Django Security Arsenal: Custom Middleware for Modern Threat Protection
Django’s built-in security features are robust, but modern web applications face sophisticated threats that require advanced protection mechanisms. Custom middleware provides the perfect layer to implement security controls that operate on every request before it reaches your views.
This guide will show you how to build a comprehensive security middleware stack that protects against injection attacks, implements advanced authentication, prevents abuse, and monitors security events in real-time.
Understanding Django Middleware Security Architecture
Middleware Execution Order
Django processes middleware in a specific order during request/response cycles:
# settings.py - Security-focused middleware stack
MIDDLEWARE = [
# 1. Security headers (first line of defense)
'myapp.middleware.SecurityHeadersMiddleware',
# 2. Rate limiting (prevent abuse)
'myapp.middleware.RateLimitMiddleware',
# 3. IP filtering (block malicious IPs)
'myapp.middleware.IPFilterMiddleware',
# 4. Request validation (sanitize input)
'myapp.middleware.RequestValidationMiddleware',
# Django's built-in security
'django.middleware.security.SecurityMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
'django.middleware.csrf.CsrfViewMiddleware',
# 5. Custom authentication (JWT/API tokens)
'myapp.middleware.JWTAuthenticationMiddleware',
# Django's auth middleware
'django.contrib.auth.middleware.AuthenticationMiddleware',
# 6. Audit logging (security monitoring)
'myapp.middleware.SecurityAuditMiddleware',
# Standard Django middleware
'django.middleware.common.CommonMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
'django.middleware.clickjacking.XFrameOptionsMiddleware',
]
Base Security Middleware Class
# middleware/base.py
import logging
import time
from django.http import JsonResponse, HttpResponseForbidden
from django.core.cache import cache
logger = logging.getLogger('security')
class BaseSecurityMiddleware:
"""Base class for security middleware with common utilities"""
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
# Pre-processing
start_time = time.time()
# Security check
security_result = self.security_check(request)
if security_result is not None:
self.log_security_event(request, security_result)
return security_result
# Process request
response = self.get_response(request)
# Post-processing
response = self.post_process(request, response)
# Log timing
processing_time = time.time() - start_time
if processing_time > 1.0: # Log slow requests
logger.warning(
f'Slow security processing: {processing_time:.2f}s '
f'for {request.path}'
)
return response
def security_check(self, request):
"""Override in subclasses for specific security checks"""
return None
def post_process(self, request, response):
"""Override in subclasses for response modification"""
return response
def log_security_event(self, request, event_data):
"""Log security events for monitoring"""
logger.warning(
f'Security event: {event_data.get("type", "unknown")} '
f'IP: {self.get_client_ip(request)} '
f'Path: {request.path} '
f'User-Agent: {request.META.get("HTTP_USER_AGENT", "unknown")}'
)
def get_client_ip(self, request):
"""Get real client IP address"""
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
if x_forwarded_for:
return x_forwarded_for.split(',')[0].strip()
return request.META.get('REMOTE_ADDR', 'unknown')
def is_api_request(self, request):
"""Check if request is for API endpoints"""
return (
request.path.startswith('/api/') or
request.content_type == 'application/json' or
'application/json' in request.META.get('HTTP_ACCEPT', '')
)
def create_security_response(self, message, status_code=403):
"""Create consistent security response"""
return JsonResponse({
'error': 'Security violation',
'message': message,
'status': status_code
}, status=status_code)
Advanced JWT Authentication Middleware
JWT Security Implementation
# middleware/jwt_auth.py
import jwt
import json
from datetime import datetime, timedelta
from django.conf import settings
from django.contrib.auth import get_user_model
from django.core.cache import cache
from django.http import JsonResponse
from .base import BaseSecurityMiddleware
User = get_user_model()
class JWTAuthenticationMiddleware(BaseSecurityMiddleware):
"""Advanced JWT authentication with security features"""
def __init__(self, get_response):
super().__init__(get_response)
self.secret_key = settings.SECRET_KEY
self.algorithm = 'HS256'
self.token_blacklist_key = 'jwt_blacklist'
def security_check(self, request):
# Skip non-API requests
if not self.is_api_request(request):
return None
# Extract JWT token
token = self.extract_token(request)
if not token:
return self.create_security_response(
'Authentication required',
status_code=401
)
# Validate token
try:
payload = self.validate_token(token)
user = self.get_user_from_payload(payload)
if not user:
return self.create_security_response(
'Invalid user',
status_code=401
)
# Security checks
if not self.perform_security_checks(request, user, payload):
return self.create_security_response(
'Security validation failed',
status_code=403
)
# Attach user to request
request.user = user
request.jwt_payload = payload
except jwt.ExpiredSignatureError:
return self.create_security_response(
'Token expired',
status_code=401
)
except jwt.InvalidTokenError as e:
return self.create_security_response(
f'Invalid token: {str(e)}',
status_code=401
)
except Exception as e:
logger.error(f'JWT validation error: {str(e)}')
return self.create_security_response(
'Authentication error',
status_code=500
)
return None
def extract_token(self, request):
"""Extract JWT token from request"""
# Try Authorization header first
auth_header = request.META.get('HTTP_AUTHORIZATION', '')
if auth_header.startswith('Bearer '):
return auth_header.split(' ')[1]
# Try custom header
token = request.META.get('HTTP_X_AUTH_TOKEN')
if token:
return token
# Try POST data (less secure, for legacy support)
if request.method == 'POST':
try:
data = json.loads(request.body)
return data.get('token')
except (json.JSONDecodeError, AttributeError):
pass
return None
def validate_token(self, token):
"""Validate JWT token with comprehensive checks"""
# Check blacklist first
if self.is_token_blacklisted(token):
raise jwt.InvalidTokenError('Token is blacklisted')
# Decode and validate
payload = jwt.decode(
token,
self.secret_key,
algorithms=[self.algorithm],
options={
'verify_signature': True,
'verify_exp': True,
'verify_iat': True,
'verify_nbf': True,
'require': ['exp', 'iat', 'user_id']
}
)
# Additional security validations
self.validate_payload_security(payload)
return payload
def validate_payload_security(self, payload):
"""Additional security validations for JWT payload"""
now = datetime.utcnow().timestamp()
# Check token age (max 24 hours for access tokens)
issued_at = payload.get('iat', 0)
if now - issued_at > 86400: # 24 hours
raise jwt.InvalidTokenError('Token too old')
# Check for required claims
required_claims = ['user_id', 'exp', 'iat']
for claim in required_claims:
if claim not in payload:
raise jwt.InvalidTokenError(f'Missing required claim: {claim}')
# Validate token type
token_type = payload.get('token_type', 'access')
if token_type != 'access':
raise jwt.InvalidTokenError('Invalid token type')
def get_user_from_payload(self, payload):
"""Get user from JWT payload with caching"""
user_id = payload.get('user_id')
if not user_id:
return None
# Try cache first
cache_key = f'jwt_user_{user_id}'
user = cache.get(cache_key)
if user is None:
try:
user = User.objects.get(id=user_id, is_active=True)
# Cache for 5 minutes
cache.set(cache_key, user, 300)
except User.DoesNotExist:
return None
return user
def perform_security_checks(self, request, user, payload):
"""Perform additional security checks"""
client_ip = self.get_client_ip(request)
# Check for suspicious login locations
if not self.validate_ip_location(user, client_ip):
logger.warning(
f'Suspicious login location for user {user.id} '
f'from IP {client_ip}'
)
# Check for concurrent session limits
if not self.check_concurrent_sessions(user, payload):
logger.warning(
f'Too many concurrent sessions for user {user.id}'
)
return False
# Check user permissions
if not self.validate_user_permissions(user, request):
return False
# Update last activity
self.update_user_activity(user, client_ip)
return True
def validate_ip_location(self, user, client_ip):
"""Validate IP location against user's typical locations"""
# Implementation would integrate with GeoIP service
# This is a simplified version
cache_key = f'user_ips_{user.id}'
known_ips = cache.get(cache_key, set())
if client_ip not in known_ips and len(known_ips) > 0:
# New IP detected - could trigger additional security measures
# For now, just log it
logger.info(f'New IP {client_ip} for user {user.id}')
# Add IP to known IPs
known_ips.add(client_ip)
cache.set(cache_key, known_ips, 86400 * 7) # 7 days
return True
def check_concurrent_sessions(self, user, payload):
"""Check concurrent session limits"""
session_key = f'user_sessions_{user.id}'
sessions = cache.get(session_key, set())
token_id = payload.get('jti', payload.get('iat'))
sessions.add(token_id)
# Limit to 5 concurrent sessions
if len(sessions) > 5:
# Remove oldest sessions (simplified)
sessions = set(list(sessions)[-5:])
cache.set(session_key, sessions, 86400)
return True
def validate_user_permissions(self, user, request):
"""Validate user has necessary permissions"""
# Check if user account is locked
if hasattr(user, 'is_locked') and user.is_locked:
return False
# Check if user requires password change
if hasattr(user, 'password_change_required') and user.password_change_required:
# Allow only password change endpoints
allowed_paths = ['/api/auth/change-password/', '/api/auth/logout/']
if request.path not in allowed_paths:
return False
return True
def update_user_activity(self, user, client_ip):
"""Update user activity tracking"""
activity_key = f'user_activity_{user.id}'
activity_data = {
'last_seen': datetime.utcnow().isoformat(),
'last_ip': client_ip,
'request_count': cache.get(f'user_requests_{user.id}', 0) + 1
}
cache.set(activity_key, activity_data, 86400)
cache.set(f'user_requests_{user.id}', activity_data['request_count'], 3600)
def is_token_blacklisted(self, token):
"""Check if token is blacklisted"""
blacklist = cache.get(self.token_blacklist_key, set())
token_hash = hash(token) # Use hash to save memory
return token_hash in blacklist
def blacklist_token(self, token):
"""Add token to blacklist"""
blacklist = cache.get(self.token_blacklist_key, set())
blacklist.add(hash(token))
cache.set(self.token_blacklist_key, blacklist, 86400)
# Utility function for views to blacklist tokens
def blacklist_current_token(request):
"""Utility to blacklist current JWT token (for logout)"""
if hasattr(request, 'jwt_payload'):
from .middleware.jwt_auth import JWTAuthenticationMiddleware
middleware = JWTAuthenticationMiddleware(None)
# Extract token again (could be optimized by storing in request)
token = middleware.extract_token(request)
if token:
middleware.blacklist_token(token)
Advanced Request Validation Middleware
Input Sanitization and XSS Prevention
# middleware/validation.py
import re
import json
import bleach
from urllib.parse import unquote
from django.http import JsonResponse
from .base import BaseSecurityMiddleware
class RequestValidationMiddleware(BaseSecurityMiddleware):
"""Advanced request validation and sanitization"""
def __init__(self, get_response):
super().__init__(get_response)
# XSS patterns
self.xss_patterns = [
r'<script[^>]*>.*?</script>',
r'javascript:',
r'vbscript:',
r'onload\s*=',
r'onerror\s*=',
r'onclick\s*=',
r'onmouse\w+\s*=',
r'<iframe[^>]*>',
r'<object[^>]*>',
r'<embed[^>]*>',
r'<link[^>]*>',
r'<meta[^>]*>',
]
# SQL injection patterns
self.sql_patterns = [
r'(\b(ALTER|CREATE|DELETE|DROP|EXEC(UTE)?|INSERT|SELECT|UNION|UPDATE)\b)',
r'(\b(OR|AND)\s+\d+\s*=\s*\d+)',
r'(\b(OR|AND)\s+\w+\s*(LIKE|=)\s*\w+)',
r'([\'"]\s*(OR|AND)\s+[\'"]\w+[\'"]\s*=\s*[\'"]\w+[\'"])',
r'(UNION\s+SELECT)',
r'(INFORMATION_SCHEMA|SYSOBJECTS|SYSCOLUMNS)',
]
# Path traversal patterns
self.path_traversal_patterns = [
r'\.\./+',
r'\.\.\\+',
r'/etc/passwd',
r'/etc/shadow',
r'\.\.\..*/',
r'%2e%2e%2f',
r'%2e%2e\\',
]
# Allowed HTML tags for content that needs basic formatting
self.allowed_tags = [
'p', 'br', 'strong', 'em', 'u', 'i', 'ul', 'ol', 'li',
'h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'blockquote', 'code', 'pre'
]
self.allowed_attributes = {
'*': ['class'],
'a': ['href', 'title'],
'img': ['src', 'alt', 'title', 'width', 'height']
}
def security_check(self, request):
# Validate request size
if not self.validate_request_size(request):
return self.create_security_response(
'Request too large',
status_code=413
)
# Validate URL
if not self.validate_url(request):
return self.create_security_response(
'Invalid request URL',
status_code=400
)
# Validate headers
if not self.validate_headers(request):
return self.create_security_response(
'Invalid request headers',
status_code=400
)
# Sanitize request data
if not self.sanitize_request_data(request):
return self.create_security_response(
'Request contains malicious content',
status_code=400
)
return None
def validate_request_size(self, request):
"""Validate request size limits"""
# Check content length
content_length = request.META.get('CONTENT_LENGTH', 0)
try:
content_length = int(content_length)
except (ValueError, TypeError):
content_length = 0
# Standard limit: 10MB, JSON API: 1MB
max_size = 1024 * 1024 # 1MB for API requests
if not self.is_api_request(request):
max_size = 10 * 1024 * 1024 # 10MB for regular requests
if content_length > max_size:
logger.warning(
f'Request size limit exceeded: {content_length} bytes '
f'from IP {self.get_client_ip(request)}'
)
return False
return True
def validate_url(self, request):
"""Validate URL for malicious patterns"""
url = unquote(request.get_full_path())
# Check for path traversal
for pattern in self.path_traversal_patterns:
if re.search(pattern, url, re.IGNORECASE):
logger.warning(
f'Path traversal attempt: {url} '
f'from IP {self.get_client_ip(request)}'
)
return False
# Check for encoded attacks
if self._check_encoded_attacks(url):
return False
# URL length check
if len(url) > 2048:
logger.warning(f'Excessively long URL: {len(url)} characters')
return False
return True
def validate_headers(self, request):
"""Validate HTTP headers for security issues"""
dangerous_headers = [
'HTTP_X_FORWARDED_HOST',
'HTTP_X_FORWARDED_SERVER',
'HTTP_HOST'
]
for header_key in dangerous_headers:
header_value = request.META.get(header_key, '')
if header_value and not self._validate_header_value(header_value):
logger.warning(
f'Malicious header {header_key}: {header_value} '
f'from IP {self.get_client_ip(request)}'
)
return False
# Check User-Agent for common attack patterns
user_agent = request.META.get('HTTP_USER_AGENT', '')
if self._is_malicious_user_agent(user_agent):
return False
return True
def sanitize_request_data(self, request):
"""Sanitize request data"""
try:
# Sanitize GET parameters
if not self._sanitize_query_params(request):
return False
# Sanitize POST data
if request.method in ['POST', 'PUT', 'PATCH']:
if not self._sanitize_post_data(request):
return False
except Exception as e:
logger.error(f'Error sanitizing request data: {str(e)}')
return False
return True
def _sanitize_query_params(self, request):
"""Sanitize URL query parameters"""
for key, value in request.GET.items():
if not self._validate_input_string(value):
logger.warning(
f'Malicious query parameter {key}: {value} '
f'from IP {self.get_client_ip(request)}'
)
return False
# Sanitize HTML content
if self._contains_html(value):
sanitized = self._sanitize_html(value)
# Replace in request.GET (this modifies the QueryDict)
request.GET._mutable = True
request.GET[key] = sanitized
request.GET._mutable = False
return True
def _sanitize_post_data(self, request):
"""Sanitize POST data"""
content_type = request.content_type.lower()
if content_type == 'application/json':
return self._sanitize_json_data(request)
elif content_type.startswith('application/x-www-form-urlencoded'):
return self._sanitize_form_data(request)
elif content_type.startswith('multipart/form-data'):
return self._sanitize_multipart_data(request)
return True
def _sanitize_json_data(self, request):
"""Sanitize JSON request data"""
try:
if hasattr(request, '_body') and request._body:
data = json.loads(request.body)
sanitized_data = self._sanitize_dict_recursive(data)
# Replace request body with sanitized data
request._body = json.dumps(sanitized_data).encode('utf-8')
except json.JSONDecodeError:
logger.warning(f'Invalid JSON in request from IP {self.get_client_ip(request)}')
return False
except Exception as e:
logger.error(f'Error sanitizing JSON data: {str(e)}')
return False
return True
def _sanitize_form_data(self, request):
"""Sanitize form-encoded data"""
for key, value in request.POST.items():
if not self._validate_input_string(value):
logger.warning(
f'Malicious POST parameter {key}: {value} '
f'from IP {self.get_client_ip(request)}'
)
return False
# Sanitize HTML content
if self._contains_html(value):
sanitized = self._sanitize_html(value)
request.POST._mutable = True
request.POST[key] = sanitized
request.POST._mutable = False
return True
def _sanitize_multipart_data(self, request):
"""Sanitize multipart form data"""
# Note: This is simplified - production code would need
# more sophisticated file upload validation
for key, value in request.POST.items():
if isinstance(value, str):
if not self._validate_input_string(value):
return False
# Validate uploaded files
for key, uploaded_file in request.FILES.items():
if not self._validate_uploaded_file(uploaded_file):
return False
return True
def _sanitize_dict_recursive(self, data):
"""Recursively sanitize dictionary data"""
if isinstance(data, dict):
return {
key: self._sanitize_dict_recursive(value)
for key, value in data.items()
}
elif isinstance(data, list):
return [self._sanitize_dict_recursive(item) for item in data]
elif isinstance(data, str):
if self._contains_html(data):
return self._sanitize_html(data)
return data
else:
return data
def _validate_input_string(self, value):
"""Validate input string for malicious patterns"""
if not isinstance(value, str):
return True
value_lower = value.lower()
# Check XSS patterns
for pattern in self.xss_patterns:
if re.search(pattern, value_lower, re.IGNORECASE):
return False
# Check SQL injection patterns
for pattern in self.sql_patterns:
if re.search(pattern, value_lower, re.IGNORECASE):
return False
# Check for null bytes
if '\x00' in value:
return False
return True
def _contains_html(self, value):
"""Check if string contains HTML"""
return bool(re.search(r'<[^>]+>', value))
def _sanitize_html(self, value):
"""Sanitize HTML content"""
return bleach.clean(
value,
tags=self.allowed_tags,
attributes=self.allowed_attributes,
strip=True
)
def _validate_header_value(self, header_value):
"""Validate HTTP header value"""
# Check for CRLF injection
if '\r' in header_value or '\n' in header_value:
return False
# Check for basic XSS in headers
if re.search(r'<script|javascript:', header_value, re.IGNORECASE):
return False
return True
def _is_malicious_user_agent(self, user_agent):
"""Check for malicious User-Agent patterns"""
malicious_patterns = [
r'sqlmap',
r'nikto',
r'netsparker',
r'acunetix',
r'burpsuite',
r'grabber',
r'w3af',
r'havij',
r'<script',
r'javascript:',
]
for pattern in malicious_patterns:
if re.search(pattern, user_agent, re.IGNORECASE):
logger.warning(f'Malicious User-Agent detected: {user_agent}')
return True
return False
def _check_encoded_attacks(self, url):
"""Check for encoded attack patterns"""
# Common encoded attack patterns
encoded_patterns = [
r'%3c%73%63%72%69%70%74', # <script
r'%22%3e%3c%73%63%72%69%70%74', # "><script
r'%27%3e%3c%73%63%72%69%70%74', # '><script
]
for pattern in encoded_patterns:
if re.search(pattern, url, re.IGNORECASE):
logger.warning(f'Encoded attack pattern in URL: {url}')
return True
return False
def _validate_uploaded_file(self, uploaded_file):
"""Validate uploaded file"""
# Check file size (10MB limit)
if uploaded_file.size > 10 * 1024 * 1024:
return False
# Check file extension
allowed_extensions = [
'.jpg', '.jpeg', '.png', '.gif', '.pdf', '.doc', '.docx',
'.xls', '.xlsx', '.txt', '.csv', '.zip'
]
filename = uploaded_file.name.lower()
if not any(filename.endswith(ext) for ext in allowed_extensions):
logger.warning(f'Disallowed file extension: {filename}')
return False
# Check for malicious filenames
if re.search(r'[<>:"/\\|?*]', uploaded_file.name):
return False
return True
def post_process(self, request, response):
"""Add security headers to response"""
# Add security headers
security_headers = {
'X-Content-Type-Options': 'nosniff',
'X-Frame-Options': 'DENY',
'X-XSS-Protection': '1; mode=block',
'Referrer-Policy': 'strict-origin-when-cross-origin',
'Content-Security-Policy': self._get_csp_header(request),
}
for header, value in security_headers.items():
if header not in response:
response[header] = value
return response
def _get_csp_header(self, request):
"""Generate Content Security Policy header"""
# Customize based on your application needs
csp_policies = [
"default-src 'self'",
"script-src 'self' 'unsafe-inline'", # Adjust as needed
"style-src 'self' 'unsafe-inline'",
"img-src 'self' data: https:",
"font-src 'self'",
"connect-src 'self'",
"frame-ancestors 'none'",
"base-uri 'self'",
"form-action 'self'"
]
return '; '.join(csp_policies)
Rate Limiting and DDoS Protection
Intelligent Rate Limiting Middleware
# middleware/rate_limit.py
import time
import hashlib
from collections import defaultdict
from django.core.cache import cache
from django.http import HttpResponse
from .base import BaseSecurityMiddleware
class RateLimitMiddleware(BaseSecurityMiddleware):
"""Advanced rate limiting with multiple algorithms"""
def __init__(self, get_response):
super().__init__(get_response)
# Rate limit configurations
self.rate_limits = {
# Per-IP limits
'ip': {
'requests': 100,
'window': 60, # 60 seconds
'burst': 20 # Burst allowance
},
# Per-user limits (authenticated users)
'user': {
'requests': 500,
'window': 60,
'burst': 50
},
# Per-endpoint limits
'endpoint': {
'/api/auth/login/': {'requests': 5, 'window': 60},
'/api/auth/register/': {'requests': 3, 'window': 300},
'/api/password/reset/': {'requests': 2, 'window': 300},
}
}
# Sliding window cache
self.sliding_windows = defaultdict(list)
def security_check(self, request):
client_ip = self.get_client_ip(request)
# Check IP-based rate limit
if not self._check_ip_rate_limit(client_ip, request):
return self._create_rate_limit_response('IP rate limit exceeded')
# Check user-based rate limit (if authenticated)
if hasattr(request, 'user') and request.user.is_authenticated:
if not self._check_user_rate_limit(request.user.id, request):
return self._create_rate_limit_response('User rate limit exceeded')
# Check endpoint-specific rate limits
if not self._check_endpoint_rate_limit(request.path, client_ip):
return self._create_rate_limit_response('Endpoint rate limit exceeded')
# Check for suspicious patterns
if self._detect_attack_pattern(client_ip, request):
return self._create_rate_limit_response('Suspicious activity detected')
return None
def _check_ip_rate_limit(self, client_ip, request):
"""Check IP-based rate limit using token bucket algorithm"""
cache_key = f'rate_limit_ip_{client_ip}'
bucket_data = cache.get(cache_key, {
'tokens': self.rate_limits['ip']['requests'],
'last_update': time.time()
})
current_time = time.time()
time_passed = current_time - bucket_data['last_update']
# Refill tokens based on time passed
refill_rate = self.rate_limits['ip']['requests'] / self.rate_limits['ip']['window']
tokens_to_add = time_passed * refill_rate
bucket_data['tokens'] = min(
self.rate_limits['ip']['requests'],
bucket_data['tokens'] + tokens_to_add
)
bucket_data['last_update'] = current_time
# Check if request can be allowed
if bucket_data['tokens'] >= 1:
bucket_data['tokens'] -= 1
cache.set(cache_key, bucket_data, self.rate_limits['ip']['window'] * 2)
return True
else:
# Log rate limit violation
logger.warning(
f'IP rate limit exceeded: {client_ip} '
f'Path: {request.path}'
)
cache.set(cache_key, bucket_data, self.rate_limits['ip']['window'] * 2)
return False
def _check_user_rate_limit(self, user_id, request):
"""Check user-based rate limit using sliding window"""
cache_key = f'rate_limit_user_{user_id}'
window_data = cache.get(cache_key, [])
current_time = time.time()
window_start = current_time - self.rate_limits['user']['window']
# Remove old entries
window_data = [timestamp for timestamp in window_data if timestamp > window_start]
# Check if limit exceeded
if len(window_data) >= self.rate_limits['user']['requests']:
logger.warning(
f'User rate limit exceeded: {user_id} '
f'Path: {request.path}'
)
return False
# Add current request
window_data.append(current_time)
cache.set(cache_key, window_data, self.rate_limits['user']['window'])
return True
def _check_endpoint_rate_limit(self, path, client_ip):
"""Check endpoint-specific rate limits"""
if path not in self.rate_limits['endpoint']:
return True
endpoint_config = self.rate_limits['endpoint'][path]
cache_key = f'rate_limit_endpoint_{hashlib.md5(f"{path}_{client_ip}".encode()).hexdigest()}'
window_data = cache.get(cache_key, [])
current_time = time.time()
window_start = current_time - endpoint_config['window']
# Clean old entries
window_data = [timestamp for timestamp in window_data if timestamp > window_start]
# Check limit
if len(window_data) >= endpoint_config['requests']:
logger.warning(
f'Endpoint rate limit exceeded: {path} '
f'IP: {client_ip}'
)
return False
# Add current request
window_data.append(current_time)
cache.set(cache_key, window_data, endpoint_config['window'])
return True
def _detect_attack_pattern(self, client_ip, request):
"""Detect potential attack patterns"""
# Track request patterns
pattern_key = f'attack_pattern_{client_ip}'
patterns = cache.get(pattern_key, {
'rapid_requests': 0,
'error_requests': 0,
'last_request_time': time.time(),
'paths': []
})
current_time = time.time()
# Check for rapid-fire requests (potential DoS)
if current_time - patterns['last_request_time'] < 0.1: # Less than 100ms
patterns['rapid_requests'] += 1
else:
patterns['rapid_requests'] = max(0, patterns['rapid_requests'] - 1)
# Check for path scanning
patterns['paths'].append(request.path)
if len(patterns['paths']) > 20:
patterns['paths'] = patterns['paths'][-20:] # Keep last 20
# Detect scanning behavior (many different paths)
unique_paths = len(set(patterns['paths']))
if unique_paths > 15: # More than 15 different paths recently
logger.warning(
f'Potential path scanning detected: {client_ip} '
f'Unique paths: {unique_paths}'
)
cache.set(pattern_key, patterns, 300)
return True
# Detect rapid requests
if patterns['rapid_requests'] > 10:
logger.warning(
f'Rapid request pattern detected: {client_ip} '
f'Rapid requests: {patterns["rapid_requests"]}'
)
cache.set(pattern_key, patterns, 300)
return True
patterns['last_request_time'] = current_time
cache.set(pattern_key, patterns, 300)
return False
def _create_rate_limit_response(self, message):
"""Create rate limit response with proper headers"""
response = HttpResponse(
content=f'Rate limit exceeded: {message}',
status=429,
content_type='text/plain'
)
# Add rate limit headers
response['Retry-After'] = str(self.rate_limits['ip']['window'])
response['X-RateLimit-Limit'] = str(self.rate_limits['ip']['requests'])
response['X-RateLimit-Window'] = str(self.rate_limits['ip']['window'])
return response
# Rate limit decorator for views
def rate_limit(requests_per_minute=60):
"""Decorator for additional view-level rate limiting"""
def decorator(view_func):
def wrapper(request, *args, **kwargs):
client_ip = request.META.get('REMOTE_ADDR', 'unknown')
cache_key = f'view_rate_limit_{view_func.__name__}_{client_ip}'
# Simple counter-based rate limiting
current_count = cache.get(cache_key, 0)
if current_count >= requests_per_minute:
return HttpResponse(
'Rate limit exceeded for this endpoint',
status=429
)
cache.set(cache_key, current_count + 1, 60) # 1 minute window
return view_func(request, *args, **kwargs)
return wrapper
return decorator
IP Filtering and Geoblocking
Advanced IP Security Middleware
# middleware/ip_filter.py
import ipaddress
import requests
from django.conf import settings
from django.core.cache import cache
from django.http import HttpResponseForbidden
from .base import BaseSecurityMiddleware
class IPFilterMiddleware(BaseSecurityMiddleware):
"""IP filtering with geolocation and threat intelligence"""
def __init__(self, get_response):
super().__init__(get_response)
# IP whitelist (always allowed)
self.whitelist = self._parse_ip_list(getattr(settings, 'IP_WHITELIST', []))
# IP blacklist (always blocked)
self.blacklist = self._parse_ip_list(getattr(settings, 'IP_BLACKLIST', []))
# Blocked countries (ISO country codes)
self.blocked_countries = getattr(settings, 'BLOCKED_COUNTRIES', [])
# GeoIP service (configure your preferred service)
self.geoip_service = getattr(settings, 'GEOIP_SERVICE', None)
def security_check(self, request):
client_ip = self.get_client_ip(request)
# Skip checks for internal IPs
if self._is_internal_ip(client_ip):
return None
# Check whitelist first
if self._is_whitelisted(client_ip):
return None
# Check blacklist
if self._is_blacklisted(client_ip):
logger.warning(f'Blocked IP from blacklist: {client_ip}')
return self._create_blocked_response('IP address is blocked')
# Check threat intelligence
if self._is_threat_ip(client_ip):
logger.warning(f'Blocked threat IP: {client_ip}')
return self._create_blocked_response('IP address flagged as threat')
# Check geolocation
if not self._check_geo_restrictions(client_ip):
logger.warning(f'Blocked IP from restricted country: {client_ip}')
return self._create_blocked_response('Access from your location is restricted')
# Check for Tor/Proxy
if self._is_tor_or_proxy(client_ip):
logger.info(f'Tor/Proxy access detected: {client_ip}')
# Decide based on your policy - block, challenge, or allow
# For now, we'll allow but log it
return None
def _parse_ip_list(self, ip_list):
"""Parse IP addresses and CIDR ranges"""
parsed_ips = []
for ip_str in ip_list:
try:
if '/' in ip_str:
# CIDR range
parsed_ips.append(ipaddress.ip_network(ip_str, strict=False))
else:
# Single IP
parsed_ips.append(ipaddress.ip_address(ip_str))
except ValueError as e:
logger.error(f'Invalid IP in configuration: {ip_str} - {e}')
return parsed_ips
def _is_internal_ip(self, ip_str):
"""Check if IP is internal/private"""
try:
ip = ipaddress.ip_address(ip_str)
return ip.is_private or ip.is_loopback
except ValueError:
return False
def _is_whitelisted(self, ip_str):
"""Check if IP is whitelisted"""
try:
ip = ipaddress.ip_address(ip_str)
for allowed in self.whitelist:
if isinstance(allowed, ipaddress.IPv4Address) or isinstance(allowed, ipaddress.IPv6Address):
if ip == allowed:
return True
elif isinstance(allowed, ipaddress.IPv4Network) or isinstance(allowed, ipaddress.IPv6Network):
if ip in allowed:
return True
except ValueError:
pass
return False
def _is_blacklisted(self, ip_str):
"""Check if IP is blacklisted"""
try:
ip = ipaddress.ip_address(ip_str)
for blocked in self.blacklist:
if isinstance(blocked, ipaddress.IPv4Address) or isinstance(blocked, ipaddress.IPv6Address):
if ip == blocked:
return True
elif isinstance(blocked, ipaddress.IPv4Network) or isinstance(blocked, ipaddress.IPv6Network):
if ip in blocked:
return True
except ValueError:
pass
return False
def _is_threat_ip(self, ip_str):
"""Check IP against threat intelligence feeds"""
cache_key = f'threat_ip_{ip_str}'
cached_result = cache.get(cache_key)
if cached_result is not None:
return cached_result == 'threat'
# Check multiple threat intelligence sources
is_threat = False
# Example: Check against AbuseIPDB (requires API key)
if hasattr(settings, 'ABUSEIPDB_API_KEY'):
is_threat = self._check_abuseipdb(ip_str)
# Cache result for 1 hour
cache.set(cache_key, 'threat' if is_threat else 'clean', 3600)
return is_threat
def _check_abuseipdb(self, ip_str):
"""Check IP against AbuseIPDB"""
try:
url = 'https://api.abuseipdb.com/api/v2/check'
headers = {
'Key': settings.ABUSEIPDB_API_KEY,
'Accept': 'application/json'
}
params = {
'ipAddress': ip_str,
'maxAgeInDays': 90,
'verbose': ''
}
response = requests.get(url, headers=headers, params=params, timeout=5)
if response.status_code == 200:
data = response.json()
abuse_confidence = data.get('data', {}).get('abuseConfidencePercentage', 0)
# Consider IPs with >25% abuse confidence as threats
return abuse_confidence > 25
except requests.RequestException as e:
logger.error(f'Error checking AbuseIPDB: {e}')
return False
def _check_geo_restrictions(self, ip_str):
"""Check IP against geographic restrictions"""
if not self.blocked_countries:
return True
cache_key = f'geoip_{ip_str}'
cached_country = cache.get(cache_key)
if cached_country is None:
cached_country = self._get_country_code(ip_str)
# Cache for 24 hours
cache.set(cache_key, cached_country or 'unknown', 86400)
return cached_country not in self.blocked_countries
def _get_country_code(self, ip_str):
"""Get country code for IP address"""
# Example using ipapi.co (free tier available)
try:
response = requests.get(
f'https://ipapi.co/{ip_str}/country/',
timeout=5
)
if response.status_code == 200:
return response.text.strip()
except requests.RequestException as e:
logger.error(f'Error getting country code: {e}')
return None
def _is_tor_or_proxy(self, ip_str):
"""Check if IP is from Tor network or known proxy"""
cache_key = f'tor_proxy_{ip_str}'
cached_result = cache.get(cache_key)
if cached_result is not None:
return cached_result == 'proxy'
is_proxy = False
# Check Tor exit nodes (example using public list)
if self._check_tor_exit_nodes(ip_str):
is_proxy = True
# Check known proxy services
if self._check_proxy_services(ip_str):
is_proxy = True
# Cache result for 6 hours
cache.set(cache_key, 'proxy' if is_proxy else 'direct', 21600)
return is_proxy
def _check_tor_exit_nodes(self, ip_str):
"""Check against Tor exit node list"""
# This would integrate with Tor's exit node list
# For simplicity, returning False here
return False
def _check_proxy_services(self, ip_str):
"""Check against known proxy/VPN services"""
# This would integrate with proxy detection services
# For simplicity, returning False here
return False
def _create_blocked_response(self, message):
"""Create blocked IP response"""
return HttpResponseForbidden(
f'Access Denied: {message}',
content_type='text/plain'
)
Security Monitoring and Audit Logging
Comprehensive Security Audit Middleware
# middleware/security_audit.py
import json
import uuid
from datetime import datetime
from django.conf import settings
from django.core.serializers.json import DjangoJSONEncoder
from .base import BaseSecurityMiddleware
class SecurityAuditMiddleware(BaseSecurityMiddleware):
"""Comprehensive security event logging and monitoring"""
def __init__(self, get_response):
super().__init__(get_response)
self.audit_enabled = getattr(settings, 'SECURITY_AUDIT_ENABLED', True)
self.sensitive_fields = getattr(settings, 'AUDIT_SENSITIVE_FIELDS', [
'password', 'token', 'secret', 'key', 'auth', 'session'
])
def __call__(self, request):
if not self.audit_enabled:
return self.get_response(request)
# Generate request ID for tracking
request_id = str(uuid.uuid4())
request.security_audit_id = request_id
# Log request
self._log_request(request, request_id)
# Process request
response = self.get_response(request)
# Log response
self._log_response(request, response, request_id)
return response
def _log_request(self, request, request_id):
"""Log security-relevant request data"""
audit_data = {
'event_type': 'http_request',
'request_id': request_id,
'timestamp': datetime.utcnow().isoformat(),
'client_ip': self.get_client_ip(request),
'user_id': getattr(request.user, 'id', None) if hasattr(request, 'user') else None,
'user_agent': request.META.get('HTTP_USER_AGENT', ''),
'method': request.method,
'path': request.path,
'query_params': self._sanitize_data(dict(request.GET)),
'content_type': request.content_type,
'content_length': request.META.get('CONTENT_LENGTH', 0),
'referer': request.META.get('HTTP_REFERER', ''),
'session_key': request.session.session_key if hasattr(request, 'session') else None,
}
# Add POST data for certain content types (sanitized)
if request.method in ['POST', 'PUT', 'PATCH']:
if request.content_type == 'application/json':
try:
audit_data['post_data'] = self._sanitize_data(
json.loads(request.body)
)
except (json.JSONDecodeError, UnicodeDecodeError):
audit_data['post_data'] = '<invalid_json>'
elif request.content_type.startswith('application/x-www-form-urlencoded'):
audit_data['post_data'] = self._sanitize_data(dict(request.POST))
# Add security-relevant headers
security_headers = [
'HTTP_X_FORWARDED_FOR', 'HTTP_X_REAL_IP', 'HTTP_X_FORWARDED_PROTO',
'HTTP_AUTHORIZATION', 'HTTP_X_REQUESTED_WITH', 'HTTP_ORIGIN',
'HTTP_SEC_FETCH_SITE', 'HTTP_SEC_FETCH_MODE', 'HTTP_SEC_FETCH_DEST'
]
headers = {}
for header in security_headers:
value = request.META.get(header)
if value:
# Sanitize authorization headers
if 'AUTHORIZATION' in header:
headers[header] = self._mask_auth_header(value)
else:
headers[header] = value
audit_data['headers'] = headers
self._write_audit_log(audit_data)
def _log_response(self, request, response, request_id):
"""Log security-relevant response data"""
audit_data = {
'event_type': 'http_response',
'request_id': request_id,
'timestamp': datetime.utcnow().isoformat(),
'status_code': response.status_code,
'content_type': response.get('Content-Type', ''),
'content_length': len(response.content) if hasattr(response, 'content') else 0,
}
# Log security headers
security_headers = [
'Content-Security-Policy', 'X-Frame-Options', 'X-Content-Type-Options',
'X-XSS-Protection', 'Strict-Transport-Security', 'Set-Cookie'
]
response_headers = {}
for header in security_headers:
value = response.get(header)
if value:
if header == 'Set-Cookie':
response_headers[header] = self._mask_cookie_data(value)
else:
response_headers[header] = value
audit_data['headers'] = response_headers
# Log errors and security violations
if response.status_code >= 400:
audit_data['error_response'] = True
# Additional context for security-related status codes
if response.status_code == 401:
audit_data['security_event'] = 'authentication_failure'
elif response.status_code == 403:
audit_data['security_event'] = 'authorization_failure'
elif response.status_code == 429:
audit_data['security_event'] = 'rate_limit_exceeded'
self._write_audit_log(audit_data)
def _sanitize_data(self, data):
"""Sanitize sensitive data for logging"""
if isinstance(data, dict):
sanitized = {}
for key, value in data.items():
if self._is_sensitive_field(key):
sanitized[key] = '<redacted>'
elif isinstance(value, (dict, list)):
sanitized[key] = self._sanitize_data(value)
else:
sanitized[key] = value
return sanitized
elif isinstance(data, list):
return [self._sanitize_data(item) for item in data]
else:
return data
def _is_sensitive_field(self, field_name):
"""Check if field name indicates sensitive data"""
field_lower = field_name.lower()
return any(sensitive in field_lower for sensitive in self.sensitive_fields)
def _mask_auth_header(self, auth_header):
"""Mask authorization header for logging"""
parts = auth_header.split(' ', 1)
if len(parts) == 2:
auth_type, credentials = parts
# Show auth type but mask credentials
return f"{auth_type} <redacted>"
return "<redacted>"
def _mask_cookie_data(self, cookie_value):
"""Mask cookie data for logging"""
# Simple masking - show cookie names but hide values
cookies = []
for cookie_part in cookie_value.split(';'):
if '=' in cookie_part:
name, _ = cookie_part.strip().split('=', 1)
cookies.append(f"{name}=<redacted>")
else:
cookies.append(cookie_part.strip())
return '; '.join(cookies)
def _write_audit_log(self, audit_data):
"""Write audit data to configured logging system"""
# Log to Django logger
logger.info(
json.dumps(audit_data, cls=DjangoJSONEncoder),
extra={'audit_data': audit_data}
)
# Optional: Send to external logging service
if hasattr(settings, 'AUDIT_WEBHOOK_URL'):
self._send_to_webhook(audit_data)
# Optional: Store in database for complex queries
if getattr(settings, 'AUDIT_STORE_IN_DB', False):
self._store_in_database(audit_data)
def _send_to_webhook(self, audit_data):
"""Send audit data to external webhook"""
try:
import requests
requests.post(
settings.AUDIT_WEBHOOK_URL,
json=audit_data,
timeout=5,
headers={'Content-Type': 'application/json'}
)
except Exception as e:
logger.error(f'Failed to send audit data to webhook: {e}')
def _store_in_database(self, audit_data):
"""Store audit data in database"""
try:
from .models import SecurityAuditLog
SecurityAuditLog.objects.create(
event_type=audit_data['event_type'],
data=audit_data,
timestamp=datetime.fromisoformat(audit_data['timestamp']),
client_ip=audit_data.get('client_ip'),
user_id=audit_data.get('user_id'),
request_id=audit_data.get('request_id')
)
except Exception as e:
logger.error(f'Failed to store audit data in database: {e}')
# Models for database storage
# models.py
from django.db import models
from django.contrib.auth.models import User
class SecurityAuditLog(models.Model):
EVENT_TYPES = [
('http_request', 'HTTP Request'),
('http_response', 'HTTP Response'),
('security_violation', 'Security Violation'),
('authentication', 'Authentication Event'),
]
event_type = models.CharField(max_length=50, choices=EVENT_TYPES, db_index=True)
timestamp = models.DateTimeField(db_index=True)
request_id = models.UUIDField(db_index=True)
client_ip = models.GenericIPAddressField(db_index=True)
user = models.ForeignKey(User, on_delete=models.SET_NULL, null=True, blank=True)
data = models.JSONField()
class Meta:
ordering = ['-timestamp']
indexes = [
models.Index(fields=['event_type', '-timestamp']),
models.Index(fields=['client_ip', '-timestamp']),
models.Index(fields=['user', '-timestamp']),
]
# Security monitoring views
# views.py
from django.http import JsonResponse
from django.db.models import Count, Q
from django.utils import timezone
from datetime import timedelta
def security_dashboard(request):
"""API endpoint for security monitoring dashboard"""
if not request.user.is_staff:
return JsonResponse({'error': 'Unauthorized'}, status=403)
# Get metrics for last 24 hours
since = timezone.now() - timedelta(hours=24)
metrics = {
'total_requests': SecurityAuditLog.objects.filter(
event_type='http_request',
timestamp__gte=since
).count(),
'error_responses': SecurityAuditLog.objects.filter(
event_type='http_response',
timestamp__gte=since,
data__status_code__gte=400
).count(),
'unique_ips': SecurityAuditLog.objects.filter(
timestamp__gte=since
).values('client_ip').distinct().count(),
'top_error_paths': list(SecurityAuditLog.objects.filter(
event_type='http_response',
timestamp__gte=since,
data__status_code__gte=400
).extra(
select={'path': "data->>'path'"}
).values('path').annotate(
count=Count('path')
).order_by('-count')[:10]),
'suspicious_ips': list(SecurityAuditLog.objects.filter(
timestamp__gte=since
).extra(
select={'ip': 'client_ip'}
).values('ip').annotate(
error_count=Count('id', filter=Q(
event_type='http_response',
data__status_code__gte=400
))
).filter(error_count__gt=10).order_by('-error_count')[:10])
}
return JsonResponse(metrics)
Integration and Configuration
Complete Settings Configuration
# settings.py - Security configuration
import os
# Security middleware configuration
MIDDLEWARE = [
# Security middleware stack (order matters!)
'myapp.middleware.SecurityHeadersMiddleware',
'myapp.middleware.RateLimitMiddleware',
'myapp.middleware.IPFilterMiddleware',
'myapp.middleware.RequestValidationMiddleware',
'django.middleware.security.SecurityMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
'django.middleware.csrf.CsrfViewMiddleware',
'myapp.middleware.JWTAuthenticationMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'myapp.middleware.SecurityAuditMiddleware',
'django.middleware.common.CommonMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
'django.middleware.clickjacking.XFrameOptionsMiddleware',
]
# IP filtering settings
IP_WHITELIST = [
'127.0.0.1',
'10.0.0.0/8',
'192.168.0.0/16',
]
IP_BLACKLIST = [
# Add known malicious IPs
]
BLOCKED_COUNTRIES = [
# Add country codes to block
]
# Threat intelligence API keys
ABUSEIPDB_API_KEY = os.environ.get('ABUSEIPDB_API_KEY')
# Security audit settings
SECURITY_AUDIT_ENABLED = True
AUDIT_WEBHOOK_URL = os.environ.get('AUDIT_WEBHOOK_URL')
AUDIT_STORE_IN_DB = True
AUDIT_SENSITIVE_FIELDS = [
'password', 'token', 'secret', 'key', 'auth', 'session',
'credit_card', 'ssn', 'social_security'
]
# Django security settings
SECURE_SSL_REDIRECT = True
SECURE_HSTS_SECONDS = 31536000 # 1 year
SECURE_HSTS_INCLUDE_SUBDOMAINS = True
SECURE_HSTS_PRELOAD = True
SECURE_CONTENT_TYPE_NOSNIFF = True
SECURE_BROWSER_XSS_FILTER = True
X_FRAME_OPTIONS = 'DENY'
# Session security
SESSION_COOKIE_SECURE = True
SESSION_COOKIE_HTTPONLY = True
SESSION_COOKIE_SAMESITE = 'Strict'
SESSION_COOKIE_AGE = 3600 # 1 hour
CSRF_COOKIE_SECURE = True
CSRF_COOKIE_HTTPONLY = True
CSRF_COOKIE_SAMESITE = 'Strict'
# Content Security Policy
CSP_DEFAULT_SRC = ("'self'",)
CSP_SCRIPT_SRC = ("'self'", "'unsafe-inline'") # Adjust as needed
CSP_STYLE_SRC = ("'self'", "'unsafe-inline'")
CSP_IMG_SRC = ("'self'", "data:", "https:")
CSP_FONT_SRC = ("'self'",)
CSP_CONNECT_SRC = ("'self'",)
CSP_FRAME_ANCESTORS = ("'none'",)
CSP_BASE_URI = ("'self'",)
CSP_FORM_ACTION = ("'self'",)
# Logging configuration
LOGGING = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'security': {
'format': '[SECURITY] {levelname} {asctime} {name} {process:d} {thread:d} {message}',
'style': '{',
},
'audit': {
'format': '[AUDIT] {asctime} {message}',
'style': '{',
},
},
'handlers': {
'security_file': {
'level': 'INFO',
'class': 'logging.handlers.RotatingFileHandler',
'filename': 'logs/security.log',
'maxBytes': 10485760, # 10MB
'backupCount': 5,
'formatter': 'security',
},
'audit_file': {
'level': 'INFO',
'class': 'logging.handlers.RotatingFileHandler',
'filename': 'logs/audit.log',
'maxBytes': 52428800, # 50MB
'backupCount': 10,
'formatter': 'audit',
},
},
'loggers': {
'security': {
'handlers': ['security_file'],
'level': 'INFO',
'propagate': False,
},
'audit': {
'handlers': ['audit_file'],
'level': 'INFO',
'propagate': False,
},
},
}
Testing Your Security Middleware
Unit Tests
# tests/test_security_middleware.py
from django.test import TestCase, RequestFactory
from django.contrib.auth.models import User
from unittest.mock import patch, Mock
from myapp.middleware.jwt_auth import JWTAuthenticationMiddleware
from myapp.middleware.rate_limit import RateLimitMiddleware
class SecurityMiddlewareTests(TestCase):
def setUp(self):
self.factory = RequestFactory()
self.user = User.objects.create_user(
username='testuser',
password='testpass123'
)
def test_jwt_authentication_success(self):
"""Test successful JWT authentication"""
middleware = JWTAuthenticationMiddleware(lambda r: Mock())
# Create valid JWT token
import jwt
from django.conf import settings
payload = {
'user_id': self.user.id,
'exp': datetime.utcnow() + timedelta(hours=1),
'iat': datetime.utcnow(),
'token_type': 'access'
}
token = jwt.encode(payload, settings.SECRET_KEY, algorithm='HS256')
request = self.factory.get(
'/api/test/',
HTTP_AUTHORIZATION=f'Bearer {token}'
)
result = middleware.security_check(request)
self.assertIsNone(result) # Should pass security check
self.assertEqual(request.user, self.user)
def test_jwt_authentication_failure(self):
"""Test JWT authentication failure"""
middleware = JWTAuthenticationMiddleware(lambda r: Mock())
request = self.factory.get('/api/test/') # No token
result = middleware.security_check(request)
self.assertIsNotNone(result)
self.assertEqual(result.status_code, 401)
def test_rate_limiting(self):
"""Test rate limiting functionality"""
middleware = RateLimitMiddleware(lambda r: Mock())
# First request should pass
request1 = self.factory.get('/', REMOTE_ADDR='192.168.1.1')
result1 = middleware.security_check(request1)
self.assertIsNone(result1)
@patch('myapp.middleware.validation.logger')
def test_xss_detection(self, mock_logger):
"""Test XSS pattern detection"""
from myapp.middleware.validation import RequestValidationMiddleware
middleware = RequestValidationMiddleware(lambda r: Mock())
request = self.factory.get('/?search=<script>alert("xss")</script>')
result = middleware.security_check(request)
self.assertIsNotNone(result)
self.assertEqual(result.status_code, 400)
mock_logger.warning.assert_called()
def test_ip_filtering(self):
"""Test IP filtering"""
from myapp.middleware.ip_filter import IPFilterMiddleware
middleware = IPFilterMiddleware(lambda r: Mock())
# Test with blocked IP
request = self.factory.get('/', REMOTE_ADDR='192.168.1.100')
with patch.object(middleware, '_is_blacklisted', return_value=True):
result = middleware.security_check(request)
self.assertIsNotNone(result)
self.assertEqual(result.status_code, 403)
Conclusion
This comprehensive Django security arsenal provides multiple layers of protection:
- Authentication Security - Advanced JWT handling with blacklisting and session management
- Input Validation - XSS, SQL injection, and path traversal protection
- Rate Limiting - Multi-algorithm rate limiting with attack pattern detection
- IP Security - Geographic and threat intelligence-based filtering
- Audit Logging - Comprehensive security event monitoring
Key Benefits:
- Defense in Depth - Multiple security layers working together
- Real-time Monitoring - Immediate threat detection and response
- Scalable Architecture - Designed for high-traffic applications
- Compliance Ready - Comprehensive audit trails for compliance requirements
- Customizable - Easily adaptable to specific security requirements
Best Practices for Implementation:
- Gradual Rollout - Implement middleware incrementally
- Monitoring First - Start with audit logging before enforcement
- Performance Testing - Verify security measures don’t impact performance
- Regular Updates - Keep threat intelligence and security rules current
- Team Training - Ensure your team understands the security architecture
Remember: Security is not a one-time implementation but an ongoing process. Regularly review your security posture, update threat patterns, and adapt to new attack vectors. The middleware architecture presented here provides a solid foundation that can evolve with your security needs.
Modern web applications face sophisticated threats, but with the right middleware stack, you can build robust defenses that protect your users and data while maintaining the performance and usability your applications demand.