Files
smoothschedule/smoothschedule/core/mixins.py
poduck 410b46a896 feat: Add time block approval workflow and staff permission system
- Add TimeBlock approval status with manager approval workflow
- Create core mixins for staff permission restrictions (DenyStaffWritePermission, etc.)
- Add StaffDashboard page for staff-specific views
- Refactor MyAvailability page for time block management
- Update field mobile status machine and views
- Add per-user permission overrides via JSONField
- Document core mixins and permission system in CLAUDE.md

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-07 17:49:37 -05:00

547 lines
19 KiB
Python

"""
Core Mixins for DRF ViewSets
Reusable mixins to reduce code duplication across ViewSets.
"""
from rest_framework.permissions import BasePermission
from rest_framework.exceptions import PermissionDenied
# ==============================================================================
# Permission Classes
# ==============================================================================
def _staff_has_permission_override(user, permission_key):
"""
Check if a staff member has a per-user permission override.
Staff members can be granted specific permissions via user.permissions JSONField.
This allows owners/managers to grant individual staff access to normally restricted areas.
Args:
user: The user to check
permission_key: The permission key to check (e.g., 'can_access_resources')
Returns:
bool: True if user has the permission override
"""
if not user.is_authenticated:
return False
permissions = getattr(user, 'permissions', {}) or {}
return permissions.get(permission_key, False)
class DenyStaffWritePermission(BasePermission):
"""
Permission class that denies write operations for staff members.
Use this instead of manually checking user.role in each view method.
Staff can still perform read operations (GET, HEAD, OPTIONS).
Per-user override: Set user.permissions['can_write_<resource>'] = True
where <resource> is derived from the view's basename or model name.
Usage:
class ResourceViewSet(ModelViewSet):
permission_classes = [IsAuthenticated, DenyStaffWritePermission]
# Optional: specify custom permission key
staff_write_permission_key = 'can_edit_resources'
"""
message = "Staff members do not have access to this resource."
def has_permission(self, request, view):
# Allow read operations
if request.method in ['GET', 'HEAD', 'OPTIONS']:
return True
# Check if user is staff
from smoothschedule.users.models import User
if request.user.is_authenticated and request.user.role == User.Role.TENANT_STAFF:
# Check for per-user permission override
permission_key = self._get_permission_key(view, 'write')
if _staff_has_permission_override(request.user, permission_key):
return True
return False
return True
def _get_permission_key(self, view, action_type):
"""Get the permission key to check for overrides."""
# First check if view specifies a custom key
custom_key = getattr(view, f'staff_{action_type}_permission_key', None)
if custom_key:
return custom_key
# Otherwise derive from view basename or model
basename = getattr(view, 'basename', None)
if basename:
return f'can_{action_type}_{basename}'
# Fallback to model name
queryset = getattr(view, 'queryset', None)
if queryset is not None:
model_name = queryset.model._meta.model_name
return f'can_{action_type}_{model_name}s'
return f'can_{action_type}_resource'
class DenyStaffAllAccessPermission(BasePermission):
"""
Permission class that denies ALL operations for staff members.
Use this for endpoints where staff should not have any access,
not even read access (e.g., services, resources).
Per-user override: Set user.permissions['can_access_<resource>'] = True
where <resource> is derived from the view's basename or model name.
Usage:
class ServiceViewSet(ModelViewSet):
permission_classes = [IsAuthenticated, DenyStaffAllAccessPermission]
# Optional: specify custom permission key
staff_access_permission_key = 'can_access_services'
"""
message = "Staff members do not have access to this resource."
def has_permission(self, request, view):
from smoothschedule.users.models import User
if request.user.is_authenticated and request.user.role == User.Role.TENANT_STAFF:
# Check for per-user permission override
permission_key = self._get_permission_key(view)
if _staff_has_permission_override(request.user, permission_key):
return True
return False
return True
def _get_permission_key(self, view):
"""Get the permission key to check for overrides."""
# First check if view specifies a custom key
custom_key = getattr(view, 'staff_access_permission_key', None)
if custom_key:
return custom_key
# Otherwise derive from view basename or model
basename = getattr(view, 'basename', None)
if basename:
return f'can_access_{basename}'
# Fallback to model name
queryset = getattr(view, 'queryset', None)
if queryset is not None:
model_name = queryset.model._meta.model_name
return f'can_access_{model_name}s'
return 'can_access_resource'
class DenyStaffListPermission(BasePermission):
"""
Permission class that denies list/create/update/delete for staff members.
Staff can still retrieve individual objects (useful for customer details
on appointments where staff need name/address but not full list).
Per-user overrides:
- user.permissions['can_list_<resource>'] = True (allows list action)
- user.permissions['can_access_<resource>'] = True (allows all actions)
Usage:
class CustomerViewSet(ModelViewSet):
permission_classes = [IsAuthenticated, DenyStaffListPermission]
# Optional: specify custom permission keys
staff_list_permission_key = 'can_list_customers'
staff_access_permission_key = 'can_access_customers'
"""
message = "Staff members do not have access to this resource."
def has_permission(self, request, view):
from smoothschedule.users.models import User
# Allow retrieve (detail view) for staff
if view.action == 'retrieve':
return True
# Deny all other actions for staff (unless they have override)
if request.user.is_authenticated and request.user.role == User.Role.TENANT_STAFF:
if view.action in ['list', 'create', 'update', 'partial_update', 'destroy']:
# Check for full access override
access_key = self._get_permission_key(view, 'access')
if _staff_has_permission_override(request.user, access_key):
return True
# Check for list-only override (for list action)
if view.action == 'list':
list_key = self._get_permission_key(view, 'list')
if _staff_has_permission_override(request.user, list_key):
return True
return False
return True
def _get_permission_key(self, view, action_type):
"""Get the permission key to check for overrides."""
# First check if view specifies a custom key
custom_key = getattr(view, f'staff_{action_type}_permission_key', None)
if custom_key:
return custom_key
# Otherwise derive from view basename or model
basename = getattr(view, 'basename', None)
if basename:
return f'can_{action_type}_{basename}'
# Fallback to model name
queryset = getattr(view, 'queryset', None)
if queryset is not None:
model_name = queryset.model._meta.model_name
return f'can_{action_type}_{model_name}s'
return f'can_{action_type}_resource'
# ==============================================================================
# QuerySet Mixins
# ==============================================================================
class TenantFilteredQuerySetMixin:
"""
Mixin that filters querysets by tenant and validates user access.
Provides standardized tenant validation that was previously duplicated
across 10+ ViewSets. Use as the first mixin in your class definition.
Features:
- Validates user is authenticated
- Validates user belongs to request tenant
- Returns empty queryset for invalid access
Usage:
class ResourceViewSet(TenantFilteredQuerySetMixin, ModelViewSet):
queryset = Resource.objects.all()
# ... rest of viewset
Override `filter_queryset_for_tenant` for custom filtering logic.
"""
# Set to True to deny staff access entirely (returns empty queryset)
deny_staff_queryset = False
def get_queryset(self):
"""
Filter queryset by tenant with security validation.
CRITICAL: This validates that the user belongs to the current tenant
and prevents cross-tenant data access.
"""
queryset = super().get_queryset()
user = self.request.user
# Unauthenticated users get empty queryset
if not user.is_authenticated:
return queryset.none()
# Optionally deny staff access at queryset level
if self.deny_staff_queryset:
from smoothschedule.users.models import User
if user.role == User.Role.TENANT_STAFF:
return queryset.none()
# Validate user belongs to the current tenant
request_tenant = getattr(self.request, 'tenant', None)
if user.tenant and request_tenant:
if user.tenant.schema_name != request_tenant.schema_name:
# User is accessing a tenant they don't belong to
return queryset.none()
# Apply any custom filtering
return self.filter_queryset_for_tenant(queryset)
def filter_queryset_for_tenant(self, queryset):
"""
Override this method for custom tenant filtering logic.
By default, returns the queryset unchanged (django-tenants handles
the actual tenant scoping for most models).
"""
return queryset
class SandboxFilteredQuerySetMixin(TenantFilteredQuerySetMixin):
"""
Mixin that adds sandbox mode filtering on top of tenant filtering.
For models with `is_sandbox` field, this filters based on the
request's sandbox_mode (set by middleware).
Usage:
class CustomerViewSet(SandboxFilteredQuerySetMixin, ModelViewSet):
queryset = User.objects.filter(role=User.Role.CUSTOMER)
"""
def filter_queryset_for_tenant(self, queryset):
"""Filter by sandbox mode if the model supports it."""
queryset = super().filter_queryset_for_tenant(queryset)
# Check if model has is_sandbox field
model = queryset.model
if hasattr(model, 'is_sandbox'):
is_sandbox = getattr(self.request, 'sandbox_mode', False)
queryset = queryset.filter(is_sandbox=is_sandbox)
return queryset
class UserTenantFilteredMixin(SandboxFilteredQuerySetMixin):
"""
Mixin for ViewSets that query User model (which is in shared schema).
Since User model uses django-tenants shared schema, it needs explicit
tenant filtering via the `tenant` foreign key.
Usage:
class CustomerViewSet(UserTenantFilteredMixin, ModelViewSet):
queryset = User.objects.filter(role=User.Role.CUSTOMER)
"""
def filter_queryset_for_tenant(self, queryset):
"""Filter users by tenant foreign key."""
queryset = super().filter_queryset_for_tenant(queryset)
user = self.request.user
if user.tenant:
queryset = queryset.filter(tenant=user.tenant)
else:
# User has no tenant - return empty for safety
return queryset.none()
return queryset
# ==============================================================================
# Feature Permission Mixins
# ==============================================================================
class PluginFeatureRequiredMixin:
"""
Mixin that checks plugin permission before allowing access.
Raises PermissionDenied if tenant doesn't have 'can_use_plugins' feature.
Usage:
class PluginTemplateViewSet(PluginFeatureRequiredMixin, ModelViewSet):
# ...
"""
plugin_feature_key = 'can_use_plugins'
plugin_feature_error = (
"Your current plan does not include Plugin access. "
"Please upgrade your subscription to use plugins."
)
def check_plugin_permission(self):
"""Check if tenant has plugin permission."""
tenant = getattr(self.request, 'tenant', None)
if tenant and not tenant.has_feature(self.plugin_feature_key):
raise PermissionDenied(self.plugin_feature_error)
def list(self, request, *args, **kwargs):
self.check_plugin_permission()
return super().list(request, *args, **kwargs)
def retrieve(self, request, *args, **kwargs):
self.check_plugin_permission()
return super().retrieve(request, *args, **kwargs)
def create(self, request, *args, **kwargs):
self.check_plugin_permission()
return super().create(request, *args, **kwargs)
class TaskFeatureRequiredMixin(PluginFeatureRequiredMixin):
"""
Mixin that checks both plugin and task permissions.
Requires both 'can_use_plugins' AND 'can_use_tasks' features.
"""
def check_plugin_permission(self):
"""Check both plugin and task permissions."""
super().check_plugin_permission()
tenant = getattr(self.request, 'tenant', None)
if tenant and not tenant.has_feature('can_use_tasks'):
raise PermissionDenied(
"Your current plan does not include Scheduled Tasks. "
"Please upgrade your subscription to use scheduled tasks."
)
# ==============================================================================
# Response Helper Mixin
# ==============================================================================
class StandardResponseMixin:
"""
Mixin that provides standardized response helpers.
Reduces boilerplate for common response patterns.
"""
def success_response(self, message, data=None, status_code=200):
"""Return a success response with optional data."""
from rest_framework.response import Response
from rest_framework import status as drf_status
response_data = {'message': message}
if data:
response_data.update(data)
status_map = {
200: drf_status.HTTP_200_OK,
201: drf_status.HTTP_201_CREATED,
204: drf_status.HTTP_204_NO_CONTENT,
}
return Response(response_data, status=status_map.get(status_code, status_code))
def error_response(self, error, status_code=400):
"""Return an error response."""
from rest_framework.response import Response
from rest_framework import status as drf_status
status_map = {
400: drf_status.HTTP_400_BAD_REQUEST,
403: drf_status.HTTP_403_FORBIDDEN,
404: drf_status.HTTP_404_NOT_FOUND,
500: drf_status.HTTP_500_INTERNAL_SERVER_ERROR,
}
return Response({'error': error}, status=status_map.get(status_code, status_code))
# ==============================================================================
# Base API Views
# ==============================================================================
class TenantAPIView:
"""
Base class for tenant-aware APIViews.
Provides common functionality for views that require tenant context:
- Automatic tenant retrieval from request
- Standard error responses
- Helper methods for common patterns
Usage:
from rest_framework.views import APIView
from core.mixins import TenantAPIView
class MyView(TenantAPIView, APIView):
permission_classes = [IsAuthenticated]
def get(self, request):
tenant = self.get_tenant()
if not tenant:
return self.tenant_required_response()
# ... rest of implementation
"""
def get_tenant(self):
"""Get tenant from request. Returns None if not available."""
return getattr(self.request, 'tenant', None)
def get_tenant_or_error(self):
"""
Get tenant from request, returning error response if not available.
Returns:
tuple: (tenant, error_response) - error_response is None if tenant exists
"""
tenant = self.get_tenant()
if not tenant:
return None, self.tenant_required_response()
return tenant, None
def tenant_required_response(self):
"""Return standard error response when tenant is required but missing."""
from rest_framework.response import Response
from rest_framework import status
return Response(
{'error': 'Tenant context required'},
status=status.HTTP_400_BAD_REQUEST
)
def error_response(self, error, status_code=400):
"""Return a standardized error response."""
from rest_framework.response import Response
from rest_framework import status as drf_status
status_map = {
400: drf_status.HTTP_400_BAD_REQUEST,
403: drf_status.HTTP_403_FORBIDDEN,
404: drf_status.HTTP_404_NOT_FOUND,
500: drf_status.HTTP_500_INTERNAL_SERVER_ERROR,
}
return Response({'error': error}, status=status_map.get(status_code, status_code))
def success_response(self, data, status_code=200):
"""Return a standardized success response."""
from rest_framework.response import Response
from rest_framework import status as drf_status
status_map = {
200: drf_status.HTTP_200_OK,
201: drf_status.HTTP_201_CREATED,
}
return Response(data, status=status_map.get(status_code, status_code))
def check_feature(self, feature_key, feature_name=None):
"""
Check if tenant has a feature, return error response if not.
Args:
feature_key: The feature permission key (e.g., 'can_accept_payments')
feature_name: Human-readable name for error message (optional)
Returns:
Response or None: Error response if feature not available, None if OK
"""
tenant = self.get_tenant()
if not tenant:
return None # Let other checks handle missing tenant
if not tenant.has_feature(feature_key):
name = feature_name or feature_key.replace('can_', '').replace('_', ' ').title()
return self.error_response(
f"Your current plan does not include {name}. "
"Please upgrade your subscription to access this feature.",
status_code=403
)
return None
class TenantRequiredAPIView(TenantAPIView):
"""
Base class for views that require tenant context.
Automatically checks for tenant in dispatch and returns error if missing.
Subclasses can assume self.tenant is always available.
Usage:
class MyView(TenantRequiredAPIView, APIView):
def get(self, request):
# self.tenant is guaranteed to exist here
return Response({'name': self.tenant.name})
"""
tenant = None # Will be set in dispatch
def dispatch(self, request, *args, **kwargs):
"""Check tenant before dispatching to handler."""
self.tenant = getattr(request, 'tenant', None)
if not self.tenant:
return self.tenant_required_response()
return super().dispatch(request, *args, **kwargs)