Files
smoothschedule/core/middleware.py
poduck 2e111364a2 Initial commit: SmoothSchedule multi-tenant scheduling platform
This commit includes:
- Django backend with multi-tenancy (django-tenants)
- React + TypeScript frontend with Vite
- Platform administration API with role-based access control
- Authentication system with token-based auth
- Quick login dev tools for testing different user roles
- CORS and CSRF configuration for local development
- Docker development environment setup

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-27 01:43:20 -05:00

242 lines
9.1 KiB
Python

"""
Smooth Schedule Masquerade Audit Middleware
Captures and logs masquerading activity for compliance and security auditing
"""
import logging
import json
from django.utils.deprecation import MiddlewareMixin
from django.utils import timezone
logger = logging.getLogger('smoothschedule.security.masquerade')
class MasqueradeAuditMiddleware(MiddlewareMixin):
"""
Audit middleware that tracks masquerading (hijack) activity.
CRITICAL: This middleware MUST be placed AFTER HijackUserMiddleware in settings.
Responsibilities:
1. Detect when a user is being masqueraded (hijacked)
2. Extract the original admin user from session
3. Enrich request object with audit context
4. Log structured audit events
The enriched request will have:
- request.actual_user: The original admin (if masquerading)
- request.is_masquerading: Boolean flag
- request.masquerade_metadata: Dict with audit info
Example log output:
{
"timestamp": "2024-01-15T10:30:00Z",
"action": "API_CALL",
"endpoint": "/api/customers/",
"method": "GET",
"apparent_user": "customer@example.com",
"actual_user": "support@chronoflow.com",
"masquerading": true,
"ip_address": "192.168.1.1",
"user_agent": "Mozilla/5.0..."
}
"""
def process_request(self, request):
"""
Process incoming request to detect and log masquerading.
"""
# Initialize masquerade flags
request.is_masquerading = False
request.actual_user = None
request.masquerade_metadata = {}
# Check if user is authenticated
if not hasattr(request, 'user') or not request.user.is_authenticated:
return None
# Check for hijack session data
# django-hijack stores the original user ID in session['hijack_history']
hijack_history = request.session.get('hijack_history', [])
if hijack_history and len(hijack_history) > 0:
# User is being masqueraded
request.is_masquerading = True
# Extract original admin user ID from hijack history
# hijack_history is a list of user IDs: [original_user_id, ...]
original_user_id = hijack_history[0]
# Load the actual admin user
from users.models import User
try:
actual_user = User.objects.get(pk=original_user_id)
request.actual_user = actual_user
# Build metadata for audit logging
request.masquerade_metadata = {
'apparent_user_id': request.user.id,
'apparent_user_email': request.user.email,
'apparent_user_role': request.user.role,
'actual_user_id': actual_user.id,
'actual_user_email': actual_user.email,
'actual_user_role': actual_user.role,
'hijack_started_at': request.session.get('hijack_started_at'),
'session_key': request.session.session_key,
}
except User.DoesNotExist:
# Original user was deleted? This shouldn't happen but log it
logger.error(
f"Hijack session references non-existent user ID: {original_user_id}. "
f"Current user: {request.user.email}"
)
# Clear the corrupted hijack session
request.session.pop('hijack_history', None)
request.is_masquerading = False
return None
def process_view(self, request, view_func, view_args, view_kwargs):
"""
Log audit event when masquerading user accesses a view.
Only logs for authenticated, non-admin endpoints.
"""
if not request.is_masquerading:
return None
# Skip logging for admin interface (too noisy)
if request.path.startswith('/admin/'):
return None
# Skip logging for static files and media
if request.path.startswith('/static/') or request.path.startswith('/media/'):
return None
# Build structured log entry
log_entry = {
'timestamp': timezone.now().isoformat(),
'action': 'MASQUERADE_VIEW_ACCESS',
'path': request.path,
'method': request.method,
'view_name': view_func.__name__ if view_func else 'Unknown',
'apparent_user': request.user.email,
'apparent_user_role': request.user.get_role_display(),
'actual_user': request.actual_user.email if request.actual_user else 'Unknown',
'actual_user_role': request.actual_user.get_role_display() if request.actual_user else 'Unknown',
'ip_address': self._get_client_ip(request),
'user_agent': request.META.get('HTTP_USER_AGENT', '')[:200],
'tenant': request.user.tenant.name if request.user.tenant else 'Platform',
}
# Log as structured JSON
logger.info(
f"Masquerade Access: {request.actual_user.email} as {request.user.email}",
extra={'audit_data': log_entry}
)
return None
def process_response(self, request, response):
"""
Add audit headers to response when masquerading (for debugging).
"""
if hasattr(request, 'is_masquerading') and request.is_masquerading:
# Add custom headers (visible in browser dev tools)
response['X-SmoothSchedule-Masquerading'] = 'true'
if request.actual_user:
response['X-SmoothSchedule-Actual-User'] = request.actual_user.email
return response
@staticmethod
def _get_client_ip(request):
"""
Extract client IP address from request, handling proxies.
"""
# Check for X-Forwarded-For header (from load balancers/proxies)
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
if x_forwarded_for:
# Take the first IP (client IP, before proxies)
ip = x_forwarded_for.split(',')[0].strip()
else:
ip = request.META.get('REMOTE_ADDR')
return ip
class MasqueradeEventLogger:
"""
Utility class for logging masquerade lifecycle events.
Use this for logging hijack start/end events.
"""
@staticmethod
def log_hijack_start(hijacker, hijacked, request):
"""
Log when a hijack session starts.
"""
log_entry = {
'timestamp': timezone.now().isoformat(),
'action': 'HIJACK_START',
'hijacker_id': hijacker.id,
'hijacker_email': hijacker.email,
'hijacker_role': hijacker.get_role_display(),
'hijacked_id': hijacked.id,
'hijacked_email': hijacked.email,
'hijacked_role': hijacked.get_role_display(),
'ip_address': request.META.get('REMOTE_ADDR'),
'user_agent': request.META.get('HTTP_USER_AGENT', '')[:200],
'session_key': request.session.session_key,
}
logger.warning(
f"HIJACK START: {hijacker.email} masquerading as {hijacked.email}",
extra={'audit_data': log_entry}
)
@staticmethod
def log_hijack_end(hijacker, hijacked, request, duration_seconds=None):
"""
Log when a hijack session ends.
"""
log_entry = {
'timestamp': timezone.now().isoformat(),
'action': 'HIJACK_END',
'hijacker_id': hijacker.id,
'hijacker_email': hijacker.email,
'hijacked_id': hijacked.id,
'hijacked_email': hijacked.email,
'duration_seconds': duration_seconds,
'ip_address': request.META.get('REMOTE_ADDR'),
'session_key': request.session.session_key,
}
logger.warning(
f"HIJACK END: {hijacker.email} stopped masquerading as {hijacked.email}",
extra={'audit_data': log_entry}
)
@staticmethod
def log_hijack_denied(hijacker, hijacked, request, reason=''):
"""
Log when a hijack attempt is denied.
"""
log_entry = {
'timestamp': timezone.now().isoformat(),
'action': 'HIJACK_DENIED',
'hijacker_id': hijacker.id,
'hijacker_email': hijacker.email,
'hijacker_role': hijacker.get_role_display(),
'attempted_hijacked_id': hijacked.id,
'attempted_hijacked_email': hijacked.email,
'attempted_hijacked_role': hijacked.get_role_display(),
'denial_reason': reason,
'ip_address': request.META.get('REMOTE_ADDR'),
'user_agent': request.META.get('HTTP_USER_AGENT', '')[:200],
}
logger.error(
f"HIJACK DENIED: {hijacker.email} attempted to masquerade as {hijacked.email} - {reason}",
extra={'audit_data': log_entry}
)