Files
smoothschedule/smoothschedule/core/middleware.py
poduck a9719a5fd2 feat: Add comprehensive sandbox mode, public API system, and platform support
This commit adds major features for sandbox isolation, public API access, and platform support ticketing.

## Sandbox Mode
- Add sandbox mode toggle for businesses to test features without affecting live data
- Implement schema-based isolation for tenant data (appointments, resources, services)
- Add is_sandbox field filtering for shared models (customers, staff, tickets)
- Create sandbox middleware to detect and set sandbox mode from cookies
- Add sandbox context and hooks for React frontend
- Display sandbox banner when in test mode
- Auto-reload page when switching between live/test modes
- Prevent platform support tickets from being created in sandbox mode

## Public API System
- Full REST API for external integrations with businesses
- API token management with sandbox/live token separation
- Test tokens (ss_test_*) show full plaintext for easy testing
- Live tokens (ss_live_*) are hashed and secure
- Security validation prevents live token plaintext storage
- Comprehensive test suite for token security
- Rate limiting and throttling per token
- Webhook support for real-time event notifications
- Scoped permissions system (read/write per resource type)
- API documentation page with interactive examples
- Token revocation with confirmation modal

## Platform Support
- Dedicated support page for businesses to contact SmoothSchedule
- View all platform support tickets in one place
- Create new support tickets with simplified interface
- Reply to existing tickets with conversation history
- Platform tickets have no admin controls (no priority/category/assignee/status)
- Internal notes hidden for platform tickets (business can't see them)
- Quick help section with links to guides and API docs
- Sandbox warning prevents ticket creation in test mode
- Business ticketing retains full admin controls (priority, assignment, internal notes)

## UI/UX Improvements
- Add notification dropdown with real-time updates
- Staff permissions UI for ticket access control
- Help dropdown in sidebar with Platform Guide, Ticketing Help, API Docs, and Support
- Update sidebar "Contact Support" to "Support" with message icon
- Fix navigation links to use React Router instead of anchor tags
- Remove unused language translations (Japanese, Portuguese, Chinese)

## Technical Details
- Sandbox middleware sets request.sandbox_mode from cookies
- ViewSets filter data by is_sandbox field
- API authentication via custom token auth class
- WebSocket support for real-time ticket updates
- Migration for sandbox fields on User, Tenant, and Ticket models
- Comprehensive documentation in SANDBOX_MODE_IMPLEMENTATION.md

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-28 16:44:06 -05:00

361 lines
14 KiB
Python

"""
Smooth Schedule Core Middleware
- SandboxModeMiddleware: Switches between live and sandbox schemas
- MasqueradeAuditMiddleware: Captures and logs masquerading activity
"""
import logging
import json
from django.utils.deprecation import MiddlewareMixin
from django.utils import timezone
from django.db import connection
logger = logging.getLogger('smoothschedule.security.masquerade')
sandbox_logger = logging.getLogger('smoothschedule.sandbox')
class SandboxModeMiddleware(MiddlewareMixin):
"""
Middleware to switch between live and sandbox schemas based on:
1. Session value: request.session['sandbox_mode']
2. API header: X-Sandbox-Mode: true
3. API key prefix: ss_test_* vs ss_live_*
CRITICAL: This middleware MUST be placed AFTER TenantMainMiddleware in settings.
When sandbox mode is active:
- request.sandbox_mode = True
- Database connection is switched to tenant's sandbox schema
- All subsequent queries use the sandbox schema automatically
The sandbox schema is named: {tenant_schema_name}_sandbox
"""
def process_request(self, request):
"""
Check if sandbox mode is requested and switch schema if appropriate.
"""
# Initialize sandbox flag
request.sandbox_mode = False
# Get tenant from request (set by TenantMainMiddleware)
tenant = getattr(request, 'tenant', None)
# Debug logging
if request.path.startswith('/api/v1/tokens'):
sandbox_logger.info(f"Token endpoint: tenant={tenant}, schema={tenant.schema_name if tenant else None}")
# Skip for public schema or if no tenant
if not tenant or tenant.schema_name == 'public':
if request.path.startswith('/api/v1/tokens'):
sandbox_logger.info(f"Skipping: tenant is None or public")
return None
# Skip if sandbox is not enabled for this tenant
if not getattr(tenant, 'sandbox_enabled', False):
if request.path.startswith('/api/v1/tokens'):
sandbox_logger.info(f"Skipping: sandbox_enabled={getattr(tenant, 'sandbox_enabled', False)}")
return None
# Skip if no sandbox schema configured
sandbox_schema = getattr(tenant, 'sandbox_schema_name', None)
if not sandbox_schema:
if request.path.startswith('/api/v1/tokens'):
sandbox_logger.info(f"Skipping: no sandbox_schema_name")
return None
# Determine if sandbox mode should be active
is_sandbox = self._is_sandbox_mode(request)
if request.path.startswith('/api/v1/tokens'):
sandbox_logger.info(f"_is_sandbox_mode returned: {is_sandbox}")
if is_sandbox:
request.sandbox_mode = True
# Switch the database connection to the sandbox schema
# Note: django-tenants uses connection.set_tenant() but we need
# to manually switch to a different schema name
try:
connection.set_schema(sandbox_schema)
sandbox_logger.debug(
f"Switched to sandbox schema: {sandbox_schema} "
f"for tenant: {tenant.name}"
)
except Exception as e:
sandbox_logger.error(
f"Failed to switch to sandbox schema {sandbox_schema}: {e}"
)
# Fall back to live mode if sandbox schema doesn't exist
request.sandbox_mode = False
return None
def _is_sandbox_mode(self, request):
"""
Determine if the request should use sandbox mode.
Priority order:
1. API token prefix (ss_test_* = sandbox)
2. X-Sandbox-Mode header
3. Session value
"""
# Check for API token authentication first
auth_header = request.META.get('HTTP_AUTHORIZATION', '')
if auth_header.startswith('Bearer ss_test_'):
return True
if auth_header.startswith('Bearer ss_live_'):
return False
# Check for explicit header
sandbox_header = request.META.get('HTTP_X_SANDBOX_MODE', '').lower()
if sandbox_header == 'true':
return True
if sandbox_header == 'false':
return False
# Fall back to session value (if session is available)
# Session may not be available if this middleware runs before SessionMiddleware
session = getattr(request, 'session', None)
if session:
return session.get('sandbox_mode', False)
return False
def process_response(self, request, response):
"""
Add sandbox mode indicator to response headers.
"""
if getattr(request, 'sandbox_mode', False):
response['X-SmoothSchedule-Sandbox'] = 'true'
return response
class MasqueradeAuditMiddleware(MiddlewareMixin):
"""
Audit middleware that tracks masquerading (hijack) activity.
CRITICAL: This middleware MUST be placed AFTER HijackUserMiddleware in settings.
Responsibilities:
1. Detect when a user is being masqueraded (hijacked)
2. Extract the original admin user from session
3. Enrich request object with audit context
4. Log structured audit events
The enriched request will have:
- request.actual_user: The original admin (if masquerading)
- request.is_masquerading: Boolean flag
- request.masquerade_metadata: Dict with audit info
Example log output:
{
"timestamp": "2024-01-15T10:30:00Z",
"action": "API_CALL",
"endpoint": "/api/customers/",
"method": "GET",
"apparent_user": "customer@example.com",
"actual_user": "support@chronoflow.com",
"masquerading": true,
"ip_address": "192.168.1.1",
"user_agent": "Mozilla/5.0..."
}
"""
def process_request(self, request):
"""
Process incoming request to detect and log masquerading.
"""
# Initialize masquerade flags
request.is_masquerading = False
request.actual_user = None
request.masquerade_metadata = {}
# Check if user is authenticated
if not hasattr(request, 'user') or not request.user.is_authenticated:
return None
# Check for hijack session data
# django-hijack stores the original user ID in session['hijack_history']
hijack_history = request.session.get('hijack_history', [])
if hijack_history and len(hijack_history) > 0:
# User is being masqueraded
request.is_masquerading = True
# Extract original admin user ID from hijack history
# hijack_history is a list of user IDs: [original_user_id, ...]
original_user_id = hijack_history[0]
# Load the actual admin user
from users.models import User
try:
actual_user = User.objects.get(pk=original_user_id)
request.actual_user = actual_user
# Build metadata for audit logging
request.masquerade_metadata = {
'apparent_user_id': request.user.id,
'apparent_user_email': request.user.email,
'apparent_user_role': request.user.role,
'actual_user_id': actual_user.id,
'actual_user_email': actual_user.email,
'actual_user_role': actual_user.role,
'hijack_started_at': request.session.get('hijack_started_at'),
'session_key': request.session.session_key,
}
except User.DoesNotExist:
# Original user was deleted? This shouldn't happen but log it
logger.error(
f"Hijack session references non-existent user ID: {original_user_id}. "
f"Current user: {request.user.email}"
)
# Clear the corrupted hijack session
request.session.pop('hijack_history', None)
request.is_masquerading = False
return None
def process_view(self, request, view_func, view_args, view_kwargs):
"""
Log audit event when masquerading user accesses a view.
Only logs for authenticated, non-admin endpoints.
"""
if not request.is_masquerading:
return None
# Skip logging for admin interface (too noisy)
if request.path.startswith('/admin/'):
return None
# Skip logging for static files and media
if request.path.startswith('/static/') or request.path.startswith('/media/'):
return None
# Build structured log entry
log_entry = {
'timestamp': timezone.now().isoformat(),
'action': 'MASQUERADE_VIEW_ACCESS',
'path': request.path,
'method': request.method,
'view_name': view_func.__name__ if view_func else 'Unknown',
'apparent_user': request.user.email,
'apparent_user_role': request.user.get_role_display(),
'actual_user': request.actual_user.email if request.actual_user else 'Unknown',
'actual_user_role': request.actual_user.get_role_display() if request.actual_user else 'Unknown',
'ip_address': self._get_client_ip(request),
'user_agent': request.META.get('HTTP_USER_AGENT', '')[:200],
'tenant': request.user.tenant.name if request.user.tenant else 'Platform',
}
# Log as structured JSON
logger.info(
f"Masquerade Access: {request.actual_user.email} as {request.user.email}",
extra={'audit_data': log_entry}
)
return None
def process_response(self, request, response):
"""
Add audit headers to response when masquerading (for debugging).
"""
if hasattr(request, 'is_masquerading') and request.is_masquerading:
# Add custom headers (visible in browser dev tools)
response['X-SmoothSchedule-Masquerading'] = 'true'
if request.actual_user:
response['X-SmoothSchedule-Actual-User'] = request.actual_user.email
return response
@staticmethod
def _get_client_ip(request):
"""
Extract client IP address from request, handling proxies.
"""
# Check for X-Forwarded-For header (from load balancers/proxies)
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
if x_forwarded_for:
# Take the first IP (client IP, before proxies)
ip = x_forwarded_for.split(',')[0].strip()
else:
ip = request.META.get('REMOTE_ADDR')
return ip
class MasqueradeEventLogger:
"""
Utility class for logging masquerade lifecycle events.
Use this for logging hijack start/end events.
"""
@staticmethod
def log_hijack_start(hijacker, hijacked, request):
"""
Log when a hijack session starts.
"""
log_entry = {
'timestamp': timezone.now().isoformat(),
'action': 'HIJACK_START',
'hijacker_id': hijacker.id,
'hijacker_email': hijacker.email,
'hijacker_role': hijacker.get_role_display(),
'hijacked_id': hijacked.id,
'hijacked_email': hijacked.email,
'hijacked_role': hijacked.get_role_display(),
'ip_address': request.META.get('REMOTE_ADDR'),
'user_agent': request.META.get('HTTP_USER_AGENT', '')[:200],
'session_key': request.session.session_key,
}
logger.warning(
f"HIJACK START: {hijacker.email} masquerading as {hijacked.email}",
extra={'audit_data': log_entry}
)
@staticmethod
def log_hijack_end(hijacker, hijacked, request, duration_seconds=None):
"""
Log when a hijack session ends.
"""
log_entry = {
'timestamp': timezone.now().isoformat(),
'action': 'HIJACK_END',
'hijacker_id': hijacker.id,
'hijacker_email': hijacker.email,
'hijacked_id': hijacked.id,
'hijacked_email': hijacked.email,
'duration_seconds': duration_seconds,
'ip_address': request.META.get('REMOTE_ADDR'),
'session_key': request.session.session_key,
}
logger.warning(
f"HIJACK END: {hijacker.email} stopped masquerading as {hijacked.email}",
extra={'audit_data': log_entry}
)
@staticmethod
def log_hijack_denied(hijacker, hijacked, request, reason=''):
"""
Log when a hijack attempt is denied.
"""
log_entry = {
'timestamp': timezone.now().isoformat(),
'action': 'HIJACK_DENIED',
'hijacker_id': hijacker.id,
'hijacker_email': hijacker.email,
'hijacker_role': hijacker.get_role_display(),
'attempted_hijacked_id': hijacked.id,
'attempted_hijacked_email': hijacked.email,
'attempted_hijacked_role': hijacked.get_role_display(),
'denial_reason': reason,
'ip_address': request.META.get('REMOTE_ADDR'),
'user_agent': request.META.get('HTTP_USER_AGENT', '')[:200],
}
logger.error(
f"HIJACK DENIED: {hijacker.email} attempted to masquerade as {hijacked.email} - {reason}",
extra={'audit_data': log_entry}
)