Initial commit: SmoothSchedule multi-tenant scheduling platform

This commit includes:
- Django backend with multi-tenancy (django-tenants)
- React + TypeScript frontend with Vite
- Platform administration API with role-based access control
- Authentication system with token-based auth
- Quick login dev tools for testing different user roles
- CORS and CSRF configuration for local development
- Docker development environment setup

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
poduck
2025-11-27 01:43:20 -05:00
commit 2e111364a2
567 changed files with 96410 additions and 0 deletions

2
core/__init__.py Normal file
View File

@@ -0,0 +1,2 @@
# Core app initialization
default_app_config = 'core.apps.CoreConfig'

237
core/admin.py Normal file
View File

@@ -0,0 +1,237 @@
"""
Smooth Schedule Core App Admin Configuration
"""
from django.contrib import admin
from django.utils.html import format_html
from django.urls import reverse
from django_tenants.admin import TenantAdminMixin
from .models import Tenant, Domain, PermissionGrant
@admin.register(Tenant)
class TenantAdmin(TenantAdminMixin, admin.ModelAdmin):
"""
Admin interface for Tenant management.
"""
list_display = [
'name',
'schema_name',
'subscription_tier',
'is_active',
'created_on',
'user_count',
'domain_list',
]
list_filter = [
'is_active',
'subscription_tier',
'created_on',
]
search_fields = [
'name',
'schema_name',
'contact_email',
]
readonly_fields = [
'schema_name',
'created_on',
]
fieldsets = (
('Basic Information', {
'fields': ('name', 'schema_name', 'created_on')
}),
('Subscription', {
'fields': ('subscription_tier', 'is_active', 'max_users', 'max_resources')
}),
('Contact', {
'fields': ('contact_email', 'phone')
}),
)
def user_count(self, obj):
"""Display count of users in this tenant"""
count = obj.users.count()
return format_html(
'<span style="color: {};">{}</span>',
'green' if count < obj.max_users else 'red',
count
)
user_count.short_description = 'Users'
def domain_list(self, obj):
"""Display list of domains for this tenant"""
domains = obj.domain_set.all()
if not domains:
return '-'
domain_links = []
for domain in domains:
url = reverse('admin:core_domain_change', args=[domain.pk])
domain_links.append(f'<a href="{url}">{domain.domain}</a>')
return format_html(' | '.join(domain_links))
domain_list.short_description = 'Domains'
@admin.register(Domain)
class DomainAdmin(admin.ModelAdmin):
"""
Admin interface for Domain management.
"""
list_display = [
'domain',
'tenant',
'is_primary',
'is_custom_domain',
'verified_status',
]
list_filter = [
'is_primary',
'is_custom_domain',
]
search_fields = [
'domain',
'tenant__name',
]
readonly_fields = [
'verified_at',
]
fieldsets = (
('Domain Information', {
'fields': ('domain', 'tenant', 'is_primary')
}),
('Custom Domain Settings', {
'fields': (
'is_custom_domain',
'route53_zone_id',
'route53_record_set_id',
'ssl_certificate_arn',
'verified_at',
),
'classes': ('collapse',),
}),
)
def verified_status(self, obj):
"""Display verification status with color coding"""
if obj.is_verified():
return format_html(
'<span style="color: green;">✓ Verified</span>'
)
else:
return format_html(
'<span style="color: orange;">⚠ Pending</span>'
)
verified_status.short_description = 'Status'
@admin.register(PermissionGrant)
class PermissionGrantAdmin(admin.ModelAdmin):
"""
Admin interface for Permission Grant management.
"""
list_display = [
'id',
'grantor',
'grantee',
'action',
'granted_at',
'expires_at',
'status',
'time_left',
]
list_filter = [
'action',
'granted_at',
'expires_at',
]
search_fields = [
'grantor__email',
'grantee__email',
'action',
'reason',
]
readonly_fields = [
'granted_at',
'grantor',
'grantee',
'ip_address',
'user_agent',
]
fieldsets = (
('Grant Information', {
'fields': ('grantor', 'grantee', 'action', 'reason')
}),
('Timing', {
'fields': ('granted_at', 'expires_at', 'revoked_at')
}),
('Audit Trail', {
'fields': ('ip_address', 'user_agent'),
'classes': ('collapse',),
}),
)
def status(self, obj):
"""Display status with color coding"""
if obj.revoked_at:
return format_html(
'<span style="color: red;">✗ Revoked</span>'
)
elif obj.is_active():
return format_html(
'<span style="color: green;">✓ Active</span>'
)
else:
return format_html(
'<span style="color: gray;">⊘ Expired</span>'
)
status.short_description = 'Status'
def time_left(self, obj):
"""Display remaining time"""
remaining = obj.time_remaining()
if remaining is None:
return '-'
minutes = int(remaining.total_seconds() / 60)
if minutes < 5:
color = 'red'
elif minutes < 15:
color = 'orange'
else:
color = 'green'
return format_html(
'<span style="color: {};">{} min</span>',
color,
minutes
)
time_left.short_description = 'Time Left'
actions = ['revoke_grants']
def revoke_grants(self, request, queryset):
"""Admin action to revoke permission grants"""
count = 0
for grant in queryset:
if grant.is_active():
grant.revoke()
count += 1
self.message_user(
request,
f'Successfully revoked {count} permission grant(s).'
)
revoke_grants.short_description = 'Revoke selected grants'

18
core/apps.py Normal file
View File

@@ -0,0 +1,18 @@
"""
Smooth Schedule Core App Configuration
"""
from django.apps import AppConfig
class CoreConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'core'
verbose_name = 'Smooth Schedule Core'
def ready(self):
"""
Import signals and perform app initialization.
"""
# Import signals here when needed
# from . import signals
pass

241
core/middleware.py Normal file
View File

@@ -0,0 +1,241 @@
"""
Smooth Schedule Masquerade Audit Middleware
Captures and logs masquerading activity for compliance and security auditing
"""
import logging
import json
from django.utils.deprecation import MiddlewareMixin
from django.utils import timezone
logger = logging.getLogger('smoothschedule.security.masquerade')
class MasqueradeAuditMiddleware(MiddlewareMixin):
"""
Audit middleware that tracks masquerading (hijack) activity.
CRITICAL: This middleware MUST be placed AFTER HijackUserMiddleware in settings.
Responsibilities:
1. Detect when a user is being masqueraded (hijacked)
2. Extract the original admin user from session
3. Enrich request object with audit context
4. Log structured audit events
The enriched request will have:
- request.actual_user: The original admin (if masquerading)
- request.is_masquerading: Boolean flag
- request.masquerade_metadata: Dict with audit info
Example log output:
{
"timestamp": "2024-01-15T10:30:00Z",
"action": "API_CALL",
"endpoint": "/api/customers/",
"method": "GET",
"apparent_user": "customer@example.com",
"actual_user": "support@chronoflow.com",
"masquerading": true,
"ip_address": "192.168.1.1",
"user_agent": "Mozilla/5.0..."
}
"""
def process_request(self, request):
"""
Process incoming request to detect and log masquerading.
"""
# Initialize masquerade flags
request.is_masquerading = False
request.actual_user = None
request.masquerade_metadata = {}
# Check if user is authenticated
if not hasattr(request, 'user') or not request.user.is_authenticated:
return None
# Check for hijack session data
# django-hijack stores the original user ID in session['hijack_history']
hijack_history = request.session.get('hijack_history', [])
if hijack_history and len(hijack_history) > 0:
# User is being masqueraded
request.is_masquerading = True
# Extract original admin user ID from hijack history
# hijack_history is a list of user IDs: [original_user_id, ...]
original_user_id = hijack_history[0]
# Load the actual admin user
from users.models import User
try:
actual_user = User.objects.get(pk=original_user_id)
request.actual_user = actual_user
# Build metadata for audit logging
request.masquerade_metadata = {
'apparent_user_id': request.user.id,
'apparent_user_email': request.user.email,
'apparent_user_role': request.user.role,
'actual_user_id': actual_user.id,
'actual_user_email': actual_user.email,
'actual_user_role': actual_user.role,
'hijack_started_at': request.session.get('hijack_started_at'),
'session_key': request.session.session_key,
}
except User.DoesNotExist:
# Original user was deleted? This shouldn't happen but log it
logger.error(
f"Hijack session references non-existent user ID: {original_user_id}. "
f"Current user: {request.user.email}"
)
# Clear the corrupted hijack session
request.session.pop('hijack_history', None)
request.is_masquerading = False
return None
def process_view(self, request, view_func, view_args, view_kwargs):
"""
Log audit event when masquerading user accesses a view.
Only logs for authenticated, non-admin endpoints.
"""
if not request.is_masquerading:
return None
# Skip logging for admin interface (too noisy)
if request.path.startswith('/admin/'):
return None
# Skip logging for static files and media
if request.path.startswith('/static/') or request.path.startswith('/media/'):
return None
# Build structured log entry
log_entry = {
'timestamp': timezone.now().isoformat(),
'action': 'MASQUERADE_VIEW_ACCESS',
'path': request.path,
'method': request.method,
'view_name': view_func.__name__ if view_func else 'Unknown',
'apparent_user': request.user.email,
'apparent_user_role': request.user.get_role_display(),
'actual_user': request.actual_user.email if request.actual_user else 'Unknown',
'actual_user_role': request.actual_user.get_role_display() if request.actual_user else 'Unknown',
'ip_address': self._get_client_ip(request),
'user_agent': request.META.get('HTTP_USER_AGENT', '')[:200],
'tenant': request.user.tenant.name if request.user.tenant else 'Platform',
}
# Log as structured JSON
logger.info(
f"Masquerade Access: {request.actual_user.email} as {request.user.email}",
extra={'audit_data': log_entry}
)
return None
def process_response(self, request, response):
"""
Add audit headers to response when masquerading (for debugging).
"""
if hasattr(request, 'is_masquerading') and request.is_masquerading:
# Add custom headers (visible in browser dev tools)
response['X-SmoothSchedule-Masquerading'] = 'true'
if request.actual_user:
response['X-SmoothSchedule-Actual-User'] = request.actual_user.email
return response
@staticmethod
def _get_client_ip(request):
"""
Extract client IP address from request, handling proxies.
"""
# Check for X-Forwarded-For header (from load balancers/proxies)
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
if x_forwarded_for:
# Take the first IP (client IP, before proxies)
ip = x_forwarded_for.split(',')[0].strip()
else:
ip = request.META.get('REMOTE_ADDR')
return ip
class MasqueradeEventLogger:
"""
Utility class for logging masquerade lifecycle events.
Use this for logging hijack start/end events.
"""
@staticmethod
def log_hijack_start(hijacker, hijacked, request):
"""
Log when a hijack session starts.
"""
log_entry = {
'timestamp': timezone.now().isoformat(),
'action': 'HIJACK_START',
'hijacker_id': hijacker.id,
'hijacker_email': hijacker.email,
'hijacker_role': hijacker.get_role_display(),
'hijacked_id': hijacked.id,
'hijacked_email': hijacked.email,
'hijacked_role': hijacked.get_role_display(),
'ip_address': request.META.get('REMOTE_ADDR'),
'user_agent': request.META.get('HTTP_USER_AGENT', '')[:200],
'session_key': request.session.session_key,
}
logger.warning(
f"HIJACK START: {hijacker.email} masquerading as {hijacked.email}",
extra={'audit_data': log_entry}
)
@staticmethod
def log_hijack_end(hijacker, hijacked, request, duration_seconds=None):
"""
Log when a hijack session ends.
"""
log_entry = {
'timestamp': timezone.now().isoformat(),
'action': 'HIJACK_END',
'hijacker_id': hijacker.id,
'hijacker_email': hijacker.email,
'hijacked_id': hijacked.id,
'hijacked_email': hijacked.email,
'duration_seconds': duration_seconds,
'ip_address': request.META.get('REMOTE_ADDR'),
'session_key': request.session.session_key,
}
logger.warning(
f"HIJACK END: {hijacker.email} stopped masquerading as {hijacked.email}",
extra={'audit_data': log_entry}
)
@staticmethod
def log_hijack_denied(hijacker, hijacked, request, reason=''):
"""
Log when a hijack attempt is denied.
"""
log_entry = {
'timestamp': timezone.now().isoformat(),
'action': 'HIJACK_DENIED',
'hijacker_id': hijacker.id,
'hijacker_email': hijacker.email,
'hijacker_role': hijacker.get_role_display(),
'attempted_hijacked_id': hijacked.id,
'attempted_hijacked_email': hijacked.email,
'attempted_hijacked_role': hijacked.get_role_display(),
'denial_reason': reason,
'ip_address': request.META.get('REMOTE_ADDR'),
'user_agent': request.META.get('HTTP_USER_AGENT', '')[:200],
}
logger.error(
f"HIJACK DENIED: {hijacker.email} attempted to masquerade as {hijacked.email} - {reason}",
extra={'audit_data': log_entry}
)

271
core/models.py Normal file
View File

@@ -0,0 +1,271 @@
"""
Smooth Schedule Core Models
Multi-tenancy, Domain management, and Permission Grant system
"""
from django.db import models
from django.utils import timezone
from django_tenants.models import TenantMixin, DomainMixin
from datetime import timedelta
class Tenant(TenantMixin):
"""
Tenant model for multi-schema architecture.
Each tenant gets their own PostgreSQL schema for complete data isolation.
"""
name = models.CharField(max_length=100)
created_on = models.DateField(auto_now_add=True)
# Subscription & billing
is_active = models.BooleanField(default=True)
subscription_tier = models.CharField(
max_length=50,
choices=[
('FREE', 'Free Trial'),
('STARTER', 'Starter'),
('PROFESSIONAL', 'Professional'),
('ENTERPRISE', 'Enterprise'),
],
default='FREE'
)
# Feature flags
max_users = models.IntegerField(default=5)
max_resources = models.IntegerField(default=10)
# Metadata
contact_email = models.EmailField(blank=True)
phone = models.CharField(max_length=20, blank=True)
# Auto-created fields from TenantMixin:
# - schema_name (unique, indexed)
# - auto_create_schema
# - auto_drop_schema
class Meta:
ordering = ['name']
def __str__(self):
return self.name
class Domain(DomainMixin):
"""
Domain model for tenant routing.
Supports both subdomain (tenant.chronoflow.com) and custom domains.
"""
# Inherited from DomainMixin:
# - domain (unique, primary key)
# - tenant (ForeignKey to Tenant)
# - is_primary (boolean)
# Route53 integration fields for custom domains
is_custom_domain = models.BooleanField(
default=False,
help_text="True if this is a custom domain (not a subdomain of smoothschedule.com)"
)
route53_zone_id = models.CharField(
max_length=100,
blank=True,
null=True,
help_text="AWS Route53 Hosted Zone ID for this custom domain"
)
route53_record_set_id = models.CharField(
max_length=100,
blank=True,
null=True,
help_text="Route53 Record Set ID for DNS verification"
)
# SSL certificate management (for future AWS ACM integration)
ssl_certificate_arn = models.CharField(
max_length=200,
blank=True,
null=True,
help_text="AWS ACM Certificate ARN for HTTPS"
)
verified_at = models.DateTimeField(
null=True,
blank=True,
help_text="When the custom domain was verified via DNS"
)
class Meta:
ordering = ['domain']
def __str__(self):
domain_type = "Custom" if self.is_custom_domain else "Subdomain"
return f"{self.domain} ({domain_type})"
def is_verified(self):
"""Check if custom domain is verified"""
if not self.is_custom_domain:
return True # Subdomains are always verified
return self.verified_at is not None
class PermissionGrant(models.Model):
"""
Time-limited permission grants (30-minute window).
Used for temporary elevated access without permanently changing user permissions.
Example use cases:
- Support agent needs temporary access to tenant data
- Sales demo requiring elevated permissions
- Cross-tenant operations during migrations
"""
grantor = models.ForeignKey(
'users.User',
on_delete=models.CASCADE,
related_name='granted_permissions',
help_text="User who granted the permission"
)
grantee = models.ForeignKey(
'users.User',
on_delete=models.CASCADE,
related_name='received_permissions',
help_text="User who received the permission"
)
action = models.CharField(
max_length=100,
help_text="Specific action or permission being granted (e.g., 'view_billing', 'edit_settings')"
)
reason = models.TextField(
blank=True,
help_text="Justification for granting this permission (audit trail)"
)
granted_at = models.DateTimeField(auto_now_add=True)
expires_at = models.DateTimeField(
help_text="When this permission grant expires"
)
revoked_at = models.DateTimeField(
null=True,
blank=True,
help_text="If manually revoked before expiration"
)
# Audit metadata
ip_address = models.GenericIPAddressField(
null=True,
blank=True,
help_text="IP address where grant was created"
)
user_agent = models.TextField(
blank=True,
help_text="Browser/client user agent"
)
class Meta:
ordering = ['-granted_at']
indexes = [
models.Index(fields=['grantee', 'expires_at']),
models.Index(fields=['grantor', 'granted_at']),
]
def __str__(self):
return f"{self.grantor}{self.grantee}: {self.action} (expires {self.expires_at})"
def is_active(self):
"""
Check if this permission grant is currently active.
Returns False if expired or revoked.
"""
now = timezone.now()
# Check if revoked
if self.revoked_at is not None:
return False
# Check if expired
if now >= self.expires_at:
return False
return True
def revoke(self):
"""Manually revoke this permission grant"""
if self.revoked_at is None:
self.revoked_at = timezone.now()
self.save(update_fields=['revoked_at'])
def time_remaining(self):
"""Get timedelta of remaining active time (or None if expired/revoked)"""
if not self.is_active():
return None
now = timezone.now()
return self.expires_at - now
@classmethod
def create_grant(cls, grantor, grantee, action, reason="", duration_minutes=30, ip_address=None, user_agent=""):
"""
Factory method to create a time-limited permission grant.
Args:
grantor: User granting the permission
grantee: User receiving the permission
action: Permission action string
reason: Justification for audit trail
duration_minutes: How long the grant is valid (default 30)
ip_address: IP address of the request
user_agent: Browser user agent
Returns:
PermissionGrant instance
"""
now = timezone.now()
expires_at = now + timedelta(minutes=duration_minutes)
return cls.objects.create(
grantor=grantor,
grantee=grantee,
action=action,
reason=reason,
expires_at=expires_at,
ip_address=ip_address,
user_agent=user_agent
)
class TierLimit(models.Model):
"""
Defines resource limits for each subscription tier.
Used by HasQuota permission to enforce hard blocks.
"""
tier = models.CharField(
max_length=50,
choices=[
('FREE', 'Free Trial'),
('STARTER', 'Starter'),
('PROFESSIONAL', 'Professional'),
('ENTERPRISE', 'Enterprise'),
]
)
feature_code = models.CharField(
max_length=100,
help_text="Feature code (e.g., 'MAX_RESOURCES', 'MAX_USERS')"
)
limit = models.IntegerField(
default=0,
help_text="Maximum allowed count for this feature"
)
class Meta:
unique_together = ['tier', 'feature_code']
ordering = ['tier', 'feature_code']
def __str__(self):
return f"{self.tier} - {self.feature_code}: {self.limit}"

285
core/permissions.py Normal file
View File

@@ -0,0 +1,285 @@
"""
Smooth Schedule Hijack Permissions
Implements the masquerading "Matrix" - strict rules for who can impersonate whom
"""
from django.core.exceptions import PermissionDenied
def can_hijack(hijacker, hijacked):
"""
Determine if hijacker (the admin) can masquerade as hijacked (target user).
The Matrix:
┌──────────────────────┬─────────────────────────────────────────────────┐
│ Hijacker Role │ Can Hijack │
├──────────────────────┼─────────────────────────────────────────────────┤
│ SUPERUSER │ Anyone (full god mode) │
│ PLATFORM_SUPPORT │ TENANT_OWNER, TENANT_MANAGER, TENANT_STAFF │
│ PLATFORM_SALES │ Only users with is_temporary=True │
│ TENANT_OWNER │ TENANT_STAFF in same tenant only │
│ Others │ Nobody │
└──────────────────────┴─────────────────────────────────────────────────┘
Args:
hijacker: User attempting to impersonate (the admin)
hijacked: User being impersonated (the target)
Returns:
bool: True if hijack is allowed, False otherwise
Security Notes:
- Never allow self-hijacking
- Never allow hijacking of superusers (except by other superusers)
- Always validate tenant boundaries for tenant-scoped roles
- Log all hijack attempts (success and failure) for audit
"""
from users.models import User
# Safety check: can't hijack yourself
if hijacker.id == hijacked.id:
return False
# Safety check: only superusers can hijack other superusers
if hijacked.role == User.Role.SUPERUSER and hijacker.role != User.Role.SUPERUSER:
return False
# Rule 1: SUPERUSER can hijack anyone
if hijacker.role == User.Role.SUPERUSER:
return True
# Rule 2: PLATFORM_SUPPORT can hijack tenant users
if hijacker.role == User.Role.PLATFORM_SUPPORT:
return hijacked.role in [
User.Role.TENANT_OWNER,
User.Role.TENANT_MANAGER,
User.Role.TENANT_STAFF,
User.Role.CUSTOMER,
]
# Rule 3: PLATFORM_SALES can only hijack temporary demo accounts
if hijacker.role == User.Role.PLATFORM_SALES:
return hijacked.is_temporary
# Rule 4: TENANT_OWNER can hijack staff within their own tenant
if hijacker.role == User.Role.TENANT_OWNER:
# Must be in same tenant
if not hijacker.tenant or not hijacked.tenant:
return False
if hijacker.tenant.id != hijacked.tenant.id:
return False
# Can only hijack staff and customers, not other owners/managers
return hijacked.role in [
User.Role.TENANT_STAFF,
User.Role.CUSTOMER,
]
# Default: deny
return False
def can_hijack_or_403(hijacker, hijacked):
"""
Same as can_hijack but raises PermissionDenied instead of returning False.
Useful for views that want to use exception handling.
Args:
hijacker: User attempting to impersonate
hijacked: User being impersonated
Raises:
PermissionDenied: If hijack is not allowed
Returns:
bool: True if allowed (never returns False, raises instead)
"""
if not can_hijack(hijacker, hijacked):
raise PermissionDenied(
f"User {hijacker.email} ({hijacker.get_role_display()}) "
f"is not authorized to masquerade as {hijacked.email} ({hijacked.get_role_display()})"
)
return True
def get_hijackable_users(hijacker):
"""
Get queryset of all users that the hijacker can masquerade as.
Useful for building "Masquerade as..." dropdowns in the UI.
Args:
hijacker: User who wants to hijack
Returns:
QuerySet: Users that can be hijacked by this user
"""
from users.models import User
# Start with all users except self
qs = User.objects.exclude(id=hijacker.id)
# Apply filters based on hijacker role
if hijacker.role == User.Role.SUPERUSER:
# Superuser can hijack anyone
return qs
elif hijacker.role == User.Role.PLATFORM_SUPPORT:
# Can hijack all tenant-level users
return qs.filter(role__in=[
User.Role.TENANT_OWNER,
User.Role.TENANT_MANAGER,
User.Role.TENANT_STAFF,
User.Role.CUSTOMER,
])
elif hijacker.role == User.Role.PLATFORM_SALES:
# Only temporary demo accounts
return qs.filter(is_temporary=True)
elif hijacker.role == User.Role.TENANT_OWNER:
# Only staff in same tenant
if not hijacker.tenant:
return qs.none()
return qs.filter(
tenant=hijacker.tenant,
role__in=[User.Role.TENANT_STAFF, User.Role.CUSTOMER]
)
else:
# No one else can hijack
return qs.none()
def validate_hijack_chain(request):
"""
Validate that hijack chains are not too deep.
Prevents: Admin1 -> Admin2 -> Admin3 -> User scenarios.
Smooth Schedule Security Policy: Maximum hijack depth is 1.
You cannot hijack while already hijacked.
Args:
request: Django request object
Raises:
PermissionDenied: If already in a hijack session
Returns:
bool: True if allowed to start new hijack
"""
hijack_history = request.session.get('hijack_history', [])
if len(hijack_history) > 0:
raise PermissionDenied(
"Cannot start a new masquerade session while already masquerading. "
"Please exit your current session first."
)
return True
# ==============================================================================
# SaaS Quota Enforcement ("Hard Block" Logic)
# ==============================================================================
def HasQuota(feature_code):
"""
Permission factory for SaaS tier limit enforcement.
Returns a DRF permission class that blocks write operations when
tenant has exceeded their quota for a specific feature.
The "Hard Block": Prevents resource creation when tenant hits limit.
Usage:
class ResourceViewSet(ModelViewSet):
permission_classes = [IsAuthenticated, HasQuota('MAX_RESOURCES')]
Args:
feature_code: TierLimit feature code (e.g., 'MAX_RESOURCES', 'MAX_USERS')
Returns:
QuotaPermission class configured for the feature
How it Works:
1. Read operations (GET/HEAD/OPTIONS) always allowed
2. Write operations check current usage vs tier limit
3. If usage >= limit, raises PermissionDenied (403)
"""
from rest_framework.permissions import BasePermission
from django.apps import apps
class QuotaPermission(BasePermission):
"""
Dynamically generated permission class for quota checking.
"""
# Map feature codes to model paths for usage counting
# CRITICAL: This map must be populated for the permission to work
USAGE_MAP = {
'MAX_RESOURCES': 'schedule.Resource',
'MAX_USERS': 'users.User',
'MAX_EVENTS_PER_MONTH': 'schedule.Event',
# Add more mappings as needed
}
def has_permission(self, request, view):
"""
Check if tenant has quota for this operation.
Returns True for read operations, checks quota for writes.
"""
# Allow all read-only operations (GET, HEAD, OPTIONS)
if request.method in ['GET', 'HEAD', 'OPTIONS']:
return True
# Get tenant from request
tenant = getattr(request, 'tenant', None)
if not tenant:
# No tenant in request - allow for public schema operations
return True
# Get the model to check usage against
model_path = self.USAGE_MAP.get(feature_code)
if not model_path:
# Feature not mapped - fail safe by allowing
# (Production: you'd want to log this as a configuration error)
return True
# Get the model class
try:
app_label, model_name = model_path.split('.')
Model = apps.get_model(app_label, model_name)
except (ValueError, LookupError):
# Invalid model path - fail safe
return True
# Get the tier limit for this tenant
try:
from core.models import TierLimit
limit = TierLimit.objects.get(
tier=tenant.subscription_tier,
feature_code=feature_code
).limit
except TierLimit.DoesNotExist:
# No limit defined - allow (unlimited)
return True
# Count current usage
# NOTE: django-tenants automatically scopes this query to tenant schema
current_count = Model.objects.count()
# The "Hard Block": Enforce the limit
if current_count >= limit:
# Quota exceeded - deny the operation
from rest_framework.exceptions import PermissionDenied
raise PermissionDenied(
f"Quota exceeded: You have reached your plan limit of {limit} "
f"{feature_code.replace('MAX_', '').lower().replace('_', ' ')}. "
f"Please upgrade your subscription to add more."
)
# Quota available - allow the operation
return True
return QuotaPermission