Files
smoothschedule/smoothschedule/platform_admin/views.py
poduck dcb14503a2 feat: Dashboard redesign, plan permissions, and help docs improvements
Major updates including:
- Customizable dashboard with drag-and-drop widget grid layout
- Plan-based feature locking for plugins and tasks
- Comprehensive help documentation updates across all pages
- Plugin seeding in deployment process for all tenants
- Permission synchronization system for subscription plans
- QuotaOverageModal component and enhanced UX flows

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-12-03 13:02:44 -05:00

1401 lines
52 KiB
Python

"""
Platform Views
API views for platform-level operations
"""
import secrets
from datetime import timedelta
from rest_framework import viewsets, status
from rest_framework.decorators import action
from rest_framework.response import Response
from rest_framework.permissions import IsAuthenticated
from rest_framework.views import APIView
from django.db.models import Count
from django.db import transaction, connection
from django.utils import timezone
from django_tenants.utils import schema_context
from core.models import Tenant, Domain
from smoothschedule.users.models import User
from .models import TenantInvitation, PlatformSettings, SubscriptionPlan, PlatformEmailAddress
from .serializers import (
TenantSerializer,
TenantCreateSerializer,
TenantUpdateSerializer,
PlatformUserSerializer,
PlatformMetricsSerializer,
TenantInvitationSerializer,
TenantInvitationCreateSerializer,
TenantInvitationAcceptSerializer,
TenantInvitationDetailSerializer,
PlatformSettingsSerializer,
StripeKeysUpdateSerializer,
OAuthSettingsSerializer,
OAuthSettingsResponseSerializer,
SubscriptionPlanSerializer,
SubscriptionPlanCreateSerializer,
PlatformEmailAddressListSerializer,
PlatformEmailAddressSerializer,
PlatformEmailAddressCreateSerializer,
PlatformEmailAddressUpdateSerializer,
)
from .permissions import IsPlatformAdmin, IsPlatformUser
class PlatformSettingsView(APIView):
"""
GET /api/platform/settings/
Get platform settings (Stripe config status, etc.)
"""
permission_classes = [IsAuthenticated, IsPlatformAdmin]
def get(self, request):
settings = PlatformSettings.get_instance()
serializer = PlatformSettingsSerializer(settings)
return Response(serializer.data)
class StripeKeysView(APIView):
"""
POST /api/platform/settings/stripe/keys/
Update Stripe API keys
"""
permission_classes = [IsAuthenticated, IsPlatformAdmin]
def post(self, request):
serializer = StripeKeysUpdateSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
settings = PlatformSettings.get_instance()
# Update keys if provided
if serializer.validated_data.get('stripe_secret_key'):
settings.stripe_secret_key = serializer.validated_data['stripe_secret_key']
if serializer.validated_data.get('stripe_publishable_key'):
settings.stripe_publishable_key = serializer.validated_data['stripe_publishable_key']
if serializer.validated_data.get('stripe_webhook_secret'):
settings.stripe_webhook_secret = serializer.validated_data['stripe_webhook_secret']
# Clear validation status when keys change
settings.stripe_keys_validated_at = None
settings.stripe_validation_error = ''
settings.stripe_account_id = ''
settings.stripe_account_name = ''
settings.save()
return Response(PlatformSettingsSerializer(settings).data)
class StripeValidateView(APIView):
"""
POST /api/platform/settings/stripe/validate/
Validate Stripe API keys by making a test API call
"""
permission_classes = [IsAuthenticated, IsPlatformAdmin]
def post(self, request):
settings = PlatformSettings.get_instance()
if not settings.has_stripe_keys():
return Response(
{'error': 'No Stripe keys configured'},
status=status.HTTP_400_BAD_REQUEST
)
try:
import stripe
stripe.api_key = settings.get_stripe_secret_key()
# Try to retrieve account info
account = stripe.Account.retrieve()
# Update settings with account info
settings.stripe_account_id = account.id
settings.stripe_account_name = account.get('business_profile', {}).get('name', '') or account.get('email', '')
settings.stripe_keys_validated_at = timezone.now()
settings.stripe_validation_error = ''
settings.save()
return Response({
'valid': True,
'account_id': settings.stripe_account_id,
'account_name': settings.stripe_account_name,
'settings': PlatformSettingsSerializer(settings).data
})
except stripe.error.AuthenticationError as e:
settings.stripe_validation_error = str(e)
settings.stripe_keys_validated_at = None
settings.save()
return Response({
'valid': False,
'error': 'Invalid API key',
'settings': PlatformSettingsSerializer(settings).data
}, status=status.HTTP_400_BAD_REQUEST)
except Exception as e:
settings.stripe_validation_error = str(e)
settings.save()
return Response({
'valid': False,
'error': str(e),
'settings': PlatformSettingsSerializer(settings).data
}, status=status.HTTP_400_BAD_REQUEST)
class GeneralSettingsView(APIView):
"""
POST /api/platform/settings/general/
Update general platform settings (email check interval, etc.)
"""
permission_classes = [IsAuthenticated, IsPlatformAdmin]
def post(self, request):
settings = PlatformSettings.get_instance()
# Update email check interval if provided
email_check_interval = request.data.get('email_check_interval_minutes')
if email_check_interval is not None:
try:
interval = int(email_check_interval)
if interval < 1:
return Response(
{'error': 'Email check interval must be at least 1 minute'},
status=status.HTTP_400_BAD_REQUEST
)
if interval > 60:
return Response(
{'error': 'Email check interval cannot exceed 60 minutes'},
status=status.HTTP_400_BAD_REQUEST
)
settings.email_check_interval_minutes = interval
except (ValueError, TypeError):
return Response(
{'error': 'Invalid email check interval'},
status=status.HTTP_400_BAD_REQUEST
)
settings.save()
return Response(PlatformSettingsSerializer(settings).data)
class OAuthSettingsView(APIView):
"""
GET/POST /api/platform/settings/oauth/
Get or update OAuth provider settings
"""
permission_classes = [IsAuthenticated, IsPlatformAdmin]
PROVIDERS = ['google', 'apple', 'facebook', 'linkedin', 'microsoft', 'twitter', 'twitch']
def _mask_secret(self, secret):
"""Mask a secret string"""
if not secret:
return ''
if len(secret) <= 8:
return '*' * len(secret)
return f"{secret[:4]}...{secret[-4:]}"
def _format_provider_settings(self, provider, oauth_settings):
"""Format provider settings for response"""
prefix = f"{provider}"
settings_dict = oauth_settings.get(provider, {})
result = {
'enabled': settings_dict.get('enabled', False),
'client_id': settings_dict.get('client_id', ''),
'client_secret': self._mask_secret(settings_dict.get('client_secret', '')),
}
# Add provider-specific fields
if provider == 'apple':
result['team_id'] = settings_dict.get('team_id', '')
result['key_id'] = settings_dict.get('key_id', '')
elif provider == 'microsoft':
result['tenant_id'] = settings_dict.get('tenant_id', '')
return result
def get(self, request):
settings = PlatformSettings.get_instance()
oauth_settings = settings.oauth_settings or {}
response_data = {
'oauth_allow_registration': oauth_settings.get('allow_registration', True),
}
for provider in self.PROVIDERS:
response_data[provider] = self._format_provider_settings(provider, oauth_settings)
return Response(response_data)
def post(self, request):
serializer = OAuthSettingsSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
settings = PlatformSettings.get_instance()
oauth_settings = settings.oauth_settings or {}
# Update allow_registration
if 'oauth_allow_registration' in serializer.validated_data:
oauth_settings['allow_registration'] = serializer.validated_data['oauth_allow_registration']
# Update each provider's settings
for provider in self.PROVIDERS:
if provider not in oauth_settings:
oauth_settings[provider] = {}
enabled_key = f'oauth_{provider}_enabled'
client_id_key = f'oauth_{provider}_client_id'
client_secret_key = f'oauth_{provider}_client_secret'
if enabled_key in serializer.validated_data:
oauth_settings[provider]['enabled'] = serializer.validated_data[enabled_key]
if client_id_key in serializer.validated_data:
oauth_settings[provider]['client_id'] = serializer.validated_data[client_id_key]
if client_secret_key in serializer.validated_data:
# Only update if not empty (don't overwrite with empty string)
if serializer.validated_data[client_secret_key]:
oauth_settings[provider]['client_secret'] = serializer.validated_data[client_secret_key]
# Provider-specific fields
if provider == 'apple':
if f'oauth_apple_team_id' in serializer.validated_data:
oauth_settings[provider]['team_id'] = serializer.validated_data['oauth_apple_team_id']
if f'oauth_apple_key_id' in serializer.validated_data:
oauth_settings[provider]['key_id'] = serializer.validated_data['oauth_apple_key_id']
elif provider == 'microsoft':
if f'oauth_microsoft_tenant_id' in serializer.validated_data:
oauth_settings[provider]['tenant_id'] = serializer.validated_data['oauth_microsoft_tenant_id']
settings.oauth_settings = oauth_settings
settings.save()
# Return updated settings
response_data = {
'oauth_allow_registration': oauth_settings.get('allow_registration', True),
}
for provider in self.PROVIDERS:
response_data[provider] = self._format_provider_settings(provider, oauth_settings)
return Response(response_data)
class StripeWebhooksView(APIView):
"""
GET /api/platform/settings/stripe/webhooks/
List all Stripe webhook endpoints
POST /api/platform/settings/stripe/webhooks/
Create a new webhook endpoint
"""
permission_classes = [IsAuthenticated, IsPlatformAdmin]
# Default events to subscribe to
DEFAULT_EVENTS = [
"checkout.session.completed",
"checkout.session.expired",
"customer.created",
"customer.updated",
"customer.deleted",
"customer.subscription.created",
"customer.subscription.updated",
"customer.subscription.deleted",
"customer.subscription.trial_will_end",
"invoice.created",
"invoice.finalized",
"invoice.paid",
"invoice.payment_failed",
"invoice.payment_action_required",
"payment_intent.succeeded",
"payment_intent.payment_failed",
"payment_intent.canceled",
"payment_method.attached",
"payment_method.detached",
"account.updated", # For Connect
"account.application.authorized",
"account.application.deauthorized",
]
def _format_webhook(self, endpoint):
"""Format webhook endpoint for response"""
return {
'id': endpoint.id,
'url': endpoint.url,
'status': endpoint.status,
'enabled_events': endpoint.enabled_events,
'api_version': endpoint.api_version,
'created': endpoint.created.isoformat() if endpoint.created else None,
'livemode': endpoint.livemode,
# Don't expose the secret, just indicate if it exists
'has_secret': bool(endpoint.secret),
}
def get(self, request):
"""List all webhook endpoints from Stripe"""
import stripe
from djstripe.models import WebhookEndpoint
settings = PlatformSettings.get_instance()
if not settings.has_stripe_keys():
return Response(
{'error': 'Stripe keys not configured'},
status=status.HTTP_400_BAD_REQUEST
)
try:
stripe.api_key = settings.get_stripe_secret_key()
# Fetch from Stripe API
stripe_webhooks = stripe.WebhookEndpoint.list(limit=100)
# Sync to local database and format response
webhooks = []
for wh in stripe_webhooks.data:
# Sync to dj-stripe
local_wh = WebhookEndpoint.sync_from_stripe_data(wh)
webhooks.append(self._format_webhook(local_wh))
return Response({
'webhooks': webhooks,
'count': len(webhooks),
})
except stripe.error.AuthenticationError:
return Response(
{'error': 'Invalid Stripe API key'},
status=status.HTTP_400_BAD_REQUEST
)
except Exception as e:
return Response(
{'error': str(e)},
status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
def post(self, request):
"""Create a new webhook endpoint"""
import stripe
from djstripe.models import WebhookEndpoint
settings = PlatformSettings.get_instance()
if not settings.has_stripe_keys():
return Response(
{'error': 'Stripe keys not configured'},
status=status.HTTP_400_BAD_REQUEST
)
url = request.data.get('url')
if not url:
return Response(
{'error': 'URL is required'},
status=status.HTTP_400_BAD_REQUEST
)
# Validate URL format
if not url.startswith('https://'):
return Response(
{'error': 'Webhook URL must use HTTPS'},
status=status.HTTP_400_BAD_REQUEST
)
enabled_events = request.data.get('enabled_events', self.DEFAULT_EVENTS)
description = request.data.get('description', 'SmoothSchedule Platform Webhook')
try:
stripe.api_key = settings.get_stripe_secret_key()
# Create webhook on Stripe
endpoint = stripe.WebhookEndpoint.create(
url=url,
enabled_events=enabled_events,
description=description,
metadata={'created_by': 'smoothschedule_platform'},
)
# The secret is only returned on creation - save it
webhook_secret = endpoint.secret
# Sync to local database
local_wh = WebhookEndpoint.sync_from_stripe_data(endpoint)
# Store the secret in local DB (it's not returned by Stripe after creation)
local_wh.secret = webhook_secret
local_wh.save()
# Also update platform settings if this is the primary webhook
if request.data.get('set_as_primary', False):
settings.stripe_webhook_secret = webhook_secret
settings.save()
return Response({
'webhook': self._format_webhook(local_wh),
'secret': webhook_secret, # Only returned on creation!
'message': 'Webhook created successfully. Save the secret - it will not be shown again.',
}, status=status.HTTP_201_CREATED)
except stripe.error.InvalidRequestError as e:
return Response(
{'error': str(e)},
status=status.HTTP_400_BAD_REQUEST
)
except Exception as e:
return Response(
{'error': str(e)},
status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
class StripeWebhookDetailView(APIView):
"""
GET /api/platform/settings/stripe/webhooks/<id>/
Get a specific webhook endpoint
PATCH /api/platform/settings/stripe/webhooks/<id>/
Update a webhook endpoint
DELETE /api/platform/settings/stripe/webhooks/<id>/
Delete a webhook endpoint
"""
permission_classes = [IsAuthenticated, IsPlatformAdmin]
def _format_webhook(self, endpoint):
"""Format webhook endpoint for response"""
return {
'id': endpoint.id,
'url': endpoint.url,
'status': endpoint.status,
'enabled_events': endpoint.enabled_events,
'api_version': endpoint.api_version,
'created': endpoint.created.isoformat() if endpoint.created else None,
'livemode': endpoint.livemode,
'has_secret': bool(endpoint.secret),
}
def get(self, request, webhook_id):
"""Get a specific webhook endpoint"""
import stripe
from djstripe.models import WebhookEndpoint
settings = PlatformSettings.get_instance()
if not settings.has_stripe_keys():
return Response(
{'error': 'Stripe keys not configured'},
status=status.HTTP_400_BAD_REQUEST
)
try:
stripe.api_key = settings.get_stripe_secret_key()
endpoint = stripe.WebhookEndpoint.retrieve(webhook_id)
local_wh = WebhookEndpoint.sync_from_stripe_data(endpoint)
return Response({'webhook': self._format_webhook(local_wh)})
except stripe.error.InvalidRequestError as e:
return Response(
{'error': 'Webhook endpoint not found'},
status=status.HTTP_404_NOT_FOUND
)
except Exception as e:
return Response(
{'error': str(e)},
status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
def patch(self, request, webhook_id):
"""Update a webhook endpoint"""
import stripe
from djstripe.models import WebhookEndpoint
settings = PlatformSettings.get_instance()
if not settings.has_stripe_keys():
return Response(
{'error': 'Stripe keys not configured'},
status=status.HTTP_400_BAD_REQUEST
)
try:
stripe.api_key = settings.get_stripe_secret_key()
# Build update params
update_params = {}
if 'url' in request.data:
url = request.data['url']
if not url.startswith('https://'):
return Response(
{'error': 'Webhook URL must use HTTPS'},
status=status.HTTP_400_BAD_REQUEST
)
update_params['url'] = url
if 'enabled_events' in request.data:
update_params['enabled_events'] = request.data['enabled_events']
if 'disabled' in request.data:
update_params['disabled'] = request.data['disabled']
if 'description' in request.data:
update_params['description'] = request.data['description']
if not update_params:
return Response(
{'error': 'No valid fields to update'},
status=status.HTTP_400_BAD_REQUEST
)
# Update on Stripe
endpoint = stripe.WebhookEndpoint.modify(webhook_id, **update_params)
# Sync to local database
local_wh = WebhookEndpoint.sync_from_stripe_data(endpoint)
return Response({
'webhook': self._format_webhook(local_wh),
'message': 'Webhook updated successfully',
})
except stripe.error.InvalidRequestError as e:
return Response(
{'error': str(e)},
status=status.HTTP_400_BAD_REQUEST
)
except Exception as e:
return Response(
{'error': str(e)},
status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
def delete(self, request, webhook_id):
"""Delete a webhook endpoint"""
import stripe
from djstripe.models import WebhookEndpoint
settings = PlatformSettings.get_instance()
if not settings.has_stripe_keys():
return Response(
{'error': 'Stripe keys not configured'},
status=status.HTTP_400_BAD_REQUEST
)
try:
stripe.api_key = settings.get_stripe_secret_key()
# Delete on Stripe
stripe.WebhookEndpoint.delete(webhook_id)
# Delete from local database
WebhookEndpoint.objects.filter(id=webhook_id).delete()
return Response({
'message': 'Webhook deleted successfully',
}, status=status.HTTP_200_OK)
except stripe.error.InvalidRequestError as e:
return Response(
{'error': str(e)},
status=status.HTTP_400_BAD_REQUEST
)
except Exception as e:
return Response(
{'error': str(e)},
status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
class StripeWebhookRotateSecretView(APIView):
"""
POST /api/platform/settings/stripe/webhooks/<id>/rotate-secret/
Rotate the webhook signing secret
"""
permission_classes = [IsAuthenticated, IsPlatformAdmin]
def post(self, request, webhook_id):
"""Rotate the webhook signing secret"""
import stripe
from djstripe.models import WebhookEndpoint
settings = PlatformSettings.get_instance()
if not settings.has_stripe_keys():
return Response(
{'error': 'Stripe keys not configured'},
status=status.HTTP_400_BAD_REQUEST
)
try:
stripe.api_key = settings.get_stripe_secret_key()
# Rotate the secret - this creates a new secret while keeping the old one valid briefly
# Note: Stripe API doesn't have a direct "rotate" - we need to delete and recreate
# or use the webhook endpoint's secret rotation if available
# Get current endpoint
current = stripe.WebhookEndpoint.retrieve(webhook_id)
# Delete and recreate with same settings
stripe.WebhookEndpoint.delete(webhook_id)
new_endpoint = stripe.WebhookEndpoint.create(
url=current.url,
enabled_events=current.enabled_events,
description=current.get('description', ''),
metadata=current.get('metadata', {}),
)
new_secret = new_endpoint.secret
# Sync to local database
WebhookEndpoint.objects.filter(id=webhook_id).delete()
local_wh = WebhookEndpoint.sync_from_stripe_data(new_endpoint)
local_wh.secret = new_secret
local_wh.save()
# Update platform settings if this was the primary webhook
if request.data.get('update_platform_secret', False):
settings.stripe_webhook_secret = new_secret
settings.save()
return Response({
'webhook_id': new_endpoint.id,
'secret': new_secret,
'message': 'Webhook secret rotated. Save the new secret - it will not be shown again.',
'note': 'The webhook ID has changed due to recreation.',
})
except stripe.error.InvalidRequestError as e:
return Response(
{'error': str(e)},
status=status.HTTP_400_BAD_REQUEST
)
except Exception as e:
return Response(
{'error': str(e)},
status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
class SubscriptionPlanViewSet(viewsets.ModelViewSet):
"""
ViewSet for managing subscription plans.
Platform admins only.
"""
queryset = SubscriptionPlan.objects.all().order_by('price_monthly', 'name')
serializer_class = SubscriptionPlanSerializer
permission_classes = [IsAuthenticated, IsPlatformAdmin]
def get_serializer_class(self):
if self.action == 'create':
return SubscriptionPlanCreateSerializer
return SubscriptionPlanSerializer
@action(detail=True, methods=['post'])
def sync_tenants(self, request, pk=None):
"""
Sync this plan's permissions to all tenants on this plan.
This is called explicitly by the admin after confirming they want to sync.
"""
plan = self.get_object()
from .tasks import sync_subscription_plan_to_tenants
import logging
logger = logging.getLogger(__name__)
logger.info(f"SubscriptionPlan '{plan.name}' (ID: {plan.id}) - "
f"sync to tenants requested by platform admin")
# Run the sync task
sync_subscription_plan_to_tenants.delay(plan.id)
# Count tenants on this plan
from core.models import Tenant
tenant_count = Tenant.objects.filter(subscription_plan=plan).count()
return Response({
'message': f'Syncing permissions to {tenant_count} tenant(s) on the "{plan.name}" plan',
'tenant_count': tenant_count
})
@action(detail=False, methods=['post'])
def sync_with_stripe(self, request):
"""
Sync subscription plans with Stripe products.
Creates Stripe products/prices for plans that don't have them.
"""
import stripe
from django.conf import settings
stripe.api_key = settings.STRIPE_SECRET_KEY
if not stripe.api_key:
return Response(
{'error': 'Stripe API key not configured'},
status=status.HTTP_400_BAD_REQUEST
)
synced = []
errors = []
for plan in SubscriptionPlan.objects.filter(is_active=True):
# Skip if already has Stripe IDs
if plan.stripe_product_id and plan.stripe_price_id:
synced.append({
'id': plan.id,
'name': plan.name,
'status': 'already_synced'
})
continue
try:
# Create or retrieve product
if not plan.stripe_product_id:
product = stripe.Product.create(
name=plan.name,
description=plan.description or f"{plan.name} subscription plan",
metadata={
'plan_id': str(plan.id),
'plan_type': plan.plan_type,
'business_tier': plan.business_tier
}
)
plan.stripe_product_id = product.id
else:
product = stripe.Product.retrieve(plan.stripe_product_id)
# Create price if we have monthly pricing
if not plan.stripe_price_id and plan.price_monthly:
price = stripe.Price.create(
product=product.id,
unit_amount=int(plan.price_monthly * 100),
currency='usd',
recurring={'interval': 'month'},
metadata={'plan_id': str(plan.id)}
)
plan.stripe_price_id = price.id
plan.save()
synced.append({
'id': plan.id,
'name': plan.name,
'status': 'synced',
'stripe_product_id': plan.stripe_product_id,
'stripe_price_id': plan.stripe_price_id
})
except stripe.error.StripeError as e:
errors.append({
'id': plan.id,
'name': plan.name,
'error': str(e)
})
return Response({
'synced': synced,
'errors': errors,
'message': f'Synced {len(synced)} plans, {len(errors)} errors'
})
class TenantViewSet(viewsets.ModelViewSet):
"""
ViewSet for viewing, creating, and updating tenants (businesses).
Platform admins only.
"""
queryset = Tenant.objects.all().order_by('-created_on')
serializer_class = TenantSerializer
permission_classes = [IsAuthenticated, IsPlatformAdmin]
http_method_names = ['get', 'post', 'patch', 'head', 'options'] # Allow GET, POST, and PATCH
def get_queryset(self):
"""Optionally filter by active status"""
queryset = super().get_queryset()
is_active = self.request.query_params.get('is_active')
if is_active is not None:
queryset = queryset.filter(is_active=is_active.lower() == 'true')
return queryset
def get_serializer_class(self):
"""Use different serializer for different actions"""
if self.action == 'create':
return TenantCreateSerializer
if self.action in ['partial_update', 'update']:
return TenantUpdateSerializer
return TenantSerializer
@action(detail=False, methods=['get'])
def metrics(self, request):
"""Get platform-wide tenant metrics"""
total_tenants = Tenant.objects.count()
active_tenants = Tenant.objects.filter(is_active=True).count()
metrics = {
'total_tenants': total_tenants,
'active_tenants': active_tenants,
'total_users': User.objects.count(),
'mrr': 0, # TODO: Calculate from billing
'growth_rate': 0.0, # TODO: Calculate growth
}
serializer = PlatformMetricsSerializer(metrics)
return Response(serializer.data)
class PlatformUserViewSet(viewsets.ModelViewSet):
"""
ViewSet for viewing and updating users across the platform.
Platform admins only.
"""
queryset = User.objects.all().order_by('-date_joined')
serializer_class = PlatformUserSerializer
permission_classes = [IsAuthenticated, IsPlatformAdmin]
http_method_names = ['get', 'post', 'patch', 'head', 'options'] # Allow GET, POST, and PATCH
def get_queryset(self):
"""Optionally filter by business or role"""
queryset = super().get_queryset()
# Filter by role
role = self.request.query_params.get('role')
if role:
queryset = queryset.filter(role=role)
# Filter by active status
is_active = self.request.query_params.get('is_active')
if is_active is not None:
queryset = queryset.filter(is_active=is_active.lower() == 'true')
# TODO: Filter by business when we add tenant reference to User
return queryset
@action(detail=True, methods=['post'])
def verify_email(self, request, pk=None):
"""Manually verify a user's email"""
user = self.get_object()
user.email_verified = True
user.save(update_fields=['email_verified'])
return Response({'status': 'email verified'})
def partial_update(self, request, *args, **kwargs):
"""
Update platform user.
Superusers can update anyone.
Platform managers can only update platform_support users.
"""
instance = self.get_object()
user = request.user
# Permission check: superusers can edit anyone
if user.role != User.Role.SUPERUSER:
# Platform managers can only edit platform_support users
if user.role == User.Role.PLATFORM_MANAGER:
if instance.role != User.Role.PLATFORM_SUPPORT:
return Response(
{"detail": "You can only edit Platform Support users."},
status=status.HTTP_403_FORBIDDEN
)
else:
return Response(
{"detail": "You do not have permission to edit users."},
status=status.HTTP_403_FORBIDDEN
)
# Update user fields
allowed_fields = ['username', 'email', 'first_name', 'last_name', 'is_active', 'role', 'permissions']
for field in allowed_fields:
if field in request.data:
if field == 'role':
# Validate role - only allow platform roles
role_value = request.data[field].upper()
if role_value not in ['PLATFORM_MANAGER', 'PLATFORM_SUPPORT']:
return Response(
{"detail": "Invalid role. Only platform_manager and platform_support are allowed."},
status=status.HTTP_400_BAD_REQUEST
)
setattr(instance, field, role_value)
elif field == 'permissions':
# Merge permissions - don't replace entirely
current_permissions = instance.permissions or {}
new_permissions = request.data[field]
# Only allow granting permissions that the current user has
for perm_key, perm_value in new_permissions.items():
if perm_key == 'can_approve_plugins':
# Only superusers or users with this permission can grant it
if user.role == User.Role.SUPERUSER or user.permissions.get('can_approve_plugins', False):
current_permissions[perm_key] = perm_value
elif perm_key == 'can_whitelist_urls':
# Only superusers or users with this permission can grant it
if user.role == User.Role.SUPERUSER or user.permissions.get('can_whitelist_urls', False):
current_permissions[perm_key] = perm_value
instance.permissions = current_permissions
else:
setattr(instance, field, request.data[field])
# Handle password update if provided
if 'password' in request.data and request.data['password']:
instance.set_password(request.data['password'])
instance.save()
serializer = self.get_serializer(instance)
return Response(serializer.data)
class TenantInvitationViewSet(viewsets.ModelViewSet):
"""
ViewSet for managing Tenant Invitations.
Platform admins only for all actions except token-based retrieval and acceptance.
"""
queryset = TenantInvitation.objects.all().order_by('-created_at')
serializer_class = TenantInvitationSerializer
permission_classes = [IsAuthenticated, IsPlatformAdmin]
http_method_names = ['get', 'post', 'delete', 'head', 'options']
def get_serializer_class(self):
if self.action == 'create':
return TenantInvitationCreateSerializer
return TenantInvitationSerializer
def perform_create(self, serializer):
# The create method on the model will handle cancelling old invitations
# and generating token/expires_at.
instance = serializer.save(invited_by=self.request.user)
# Send invitation email via Celery task
from .tasks import send_tenant_invitation_email
send_tenant_invitation_email.delay(instance.id)
@action(detail=True, methods=['post'])
def resend(self, request, pk=None):
"""Resend invitation email for a specific invitation."""
invitation = self.get_object()
if not invitation.is_valid():
return Response(
{"detail": "Invitation is not in a valid state to be resent."},
status=status.HTTP_400_BAD_REQUEST
)
# Update expires_at and token for resend (optional, but good practice)
invitation.expires_at = timezone.now() + timedelta(days=7)
invitation.token = secrets.token_urlsafe(32) # Generate new token
invitation.save()
# Send invitation email via Celery task
from .tasks import send_tenant_invitation_email
send_tenant_invitation_email.delay(invitation.id)
return Response({"detail": "Invitation email resent successfully."}, status=status.HTTP_200_OK)
@action(detail=True, methods=['post'])
def cancel(self, request, pk=None):
"""Cancel a pending invitation."""
invitation = self.get_object()
if invitation.status == TenantInvitation.Status.PENDING:
invitation.cancel()
return Response({"detail": "Invitation cancelled successfully."}, status=status.HTTP_200_OK)
return Response(
{"detail": "Only pending invitations can be cancelled."},
status=status.HTTP_400_BAD_REQUEST
)
# Public actions (no authentication required, accessible via token)
@action(detail=False, methods=['get'], url_path='token/(?P<token>[^/.]+)', permission_classes=[])
def retrieve_by_token(self, request, token=None):
"""Retrieve invitation details using a public token."""
try:
invitation = TenantInvitation.objects.get(token=token)
except TenantInvitation.DoesNotExist:
return Response({"detail": "Invitation not found or invalid token."}, status=status.HTTP_404_NOT_FOUND)
if not invitation.is_valid():
return Response({"detail": "Invitation is no longer valid."}, status=status.HTTP_400_BAD_REQUEST)
serializer = TenantInvitationDetailSerializer(invitation)
return Response(serializer.data)
@action(detail=False, methods=['post'], url_path='token/(?P<token>[^/.]+)/accept', permission_classes=[])
def accept(self, request, token=None):
"""Accept an invitation, create tenant and owner user."""
try:
invitation = TenantInvitation.objects.get(token=token)
except TenantInvitation.DoesNotExist:
return Response({"detail": "Invitation not found or invalid token."}, status=status.HTTP_404_NOT_FOUND)
if not invitation.is_valid():
return Response({"detail": "Invitation is no longer valid."}, status=status.HTTP_400_BAD_REQUEST)
serializer = TenantInvitationAcceptSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
# Force execution in public schema for tenant creation
with schema_context('public'):
with transaction.atomic():
# Create Tenant
subdomain = serializer.validated_data['subdomain'].lower()
tenant = Tenant.objects.create(
schema_name=subdomain,
name=serializer.validated_data['business_name'],
subscription_tier=invitation.subscription_tier,
max_users=invitation.get_effective_max_users(),
max_resources=invitation.get_effective_max_resources(),
contact_email=serializer.validated_data.get('contact_email', invitation.email),
phone=serializer.validated_data.get('phone', ''),
# Set platform permissions from invitation
can_manage_oauth_credentials=invitation.permissions.get('can_manage_oauth_credentials', False),
can_accept_payments=invitation.permissions.get('can_accept_payments', False),
can_use_custom_domain=invitation.permissions.get('can_use_custom_domain', False),
can_white_label=invitation.permissions.get('can_white_label', False),
can_api_access=invitation.permissions.get('can_api_access', False),
initial_setup_complete=True, # Mark as complete after onboarding
)
# Create primary domain
domain_name = f"{subdomain}.lvh.me" # TODO: Make base domain configurable
Domain.objects.create(
domain=domain_name,
tenant=tenant,
is_primary=True,
is_custom_domain=False,
)
# Create Owner User
owner_user = User.objects.create_user(
username=serializer.validated_data['email'],
email=serializer.validated_data['email'],
password=serializer.validated_data['password'],
first_name=serializer.validated_data['first_name'],
last_name=serializer.validated_data['last_name'],
role=User.Role.TENANT_OWNER,
tenant=tenant,
)
# Mark invitation as accepted
invitation.accept(tenant, owner_user)
return Response({"detail": "Invitation accepted, tenant and user created."}, status=status.HTTP_201_CREATED)
class PlatformEmailAddressViewSet(viewsets.ModelViewSet):
"""
ViewSet for managing platform email addresses.
These are email addresses hosted on mail.talova.net that are
managed directly via SSH commands to the mail server.
Platform admins only.
"""
queryset = PlatformEmailAddress.objects.all().order_by('-is_default', 'display_name')
permission_classes = [IsAuthenticated, IsPlatformAdmin]
def get_serializer_class(self):
if self.action == 'list':
return PlatformEmailAddressListSerializer
if self.action == 'create':
return PlatformEmailAddressCreateSerializer
if self.action in ['update', 'partial_update']:
return PlatformEmailAddressUpdateSerializer
return PlatformEmailAddressSerializer
def perform_destroy(self, instance):
"""Delete email address from both database and mail server."""
from .mail_server import get_mail_server_service
# Delete from mail server first
service = get_mail_server_service()
success, message = service.delete_and_unsync(instance)
if not success:
# Log the error but still delete from database
import logging
logger = logging.getLogger(__name__)
logger.warning(f"Failed to delete email from mail server: {message}")
# Delete from database
instance.delete()
@action(detail=True, methods=['post'])
def sync(self, request, pk=None):
"""
Manually sync this email address to the mail server.
Creates the account if it doesn't exist, or updates the password if it does.
"""
from .mail_server import get_mail_server_service
email_address = self.get_object()
service = get_mail_server_service()
success, message = service.sync_account(email_address)
if success:
return Response({
'success': True,
'message': message,
'mail_server_synced': email_address.mail_server_synced,
'last_synced_at': email_address.last_synced_at,
})
else:
return Response({
'success': False,
'message': message,
'mail_server_synced': email_address.mail_server_synced,
'last_sync_error': email_address.last_sync_error,
}, status=status.HTTP_400_BAD_REQUEST)
@action(detail=True, methods=['post'])
def set_as_default(self, request, pk=None):
"""Set this email address as the default for platform support."""
email_address = self.get_object()
# Unset all other defaults
PlatformEmailAddress.objects.filter(
is_default=True
).exclude(pk=email_address.pk).update(is_default=False)
# Set this one as default
email_address.is_default = True
email_address.save()
return Response({
'success': True,
'message': f'{email_address.display_name} is now the default email address',
})
@action(detail=True, methods=['post'])
def test_imap(self, request, pk=None):
"""Test IMAP connection for this email address."""
import imaplib
email_address = self.get_object()
settings = email_address.get_imap_settings()
try:
if settings['use_ssl']:
imap = imaplib.IMAP4_SSL(settings['host'], settings['port'])
else:
imap = imaplib.IMAP4(settings['host'], settings['port'])
imap.login(settings['username'], settings['password'])
imap.select(settings['folder'])
imap.logout()
return Response({
'success': True,
'message': 'IMAP connection successful',
})
except Exception as e:
return Response({
'success': False,
'message': f'IMAP connection failed: {str(e)}',
}, status=status.HTTP_400_BAD_REQUEST)
@action(detail=True, methods=['post'])
def test_smtp(self, request, pk=None):
"""Test SMTP connection for this email address."""
import smtplib
email_address = self.get_object()
settings = email_address.get_smtp_settings()
try:
if settings['use_ssl']:
smtp = smtplib.SMTP_SSL(settings['host'], settings['port'])
else:
smtp = smtplib.SMTP(settings['host'], settings['port'])
if settings['use_tls']:
smtp.starttls()
smtp.login(settings['username'], settings['password'])
smtp.quit()
return Response({
'success': True,
'message': 'SMTP connection successful',
})
except Exception as e:
return Response({
'success': False,
'message': f'SMTP connection failed: {str(e)}',
}, status=status.HTTP_400_BAD_REQUEST)
@action(detail=False, methods=['post'])
def test_mail_server(self, request):
"""Test SSH connection to the mail server."""
from .mail_server import get_mail_server_service
service = get_mail_server_service()
success, message = service.test_connection()
if success:
return Response({
'success': True,
'message': message,
})
else:
return Response({
'success': False,
'message': message,
}, status=status.HTTP_400_BAD_REQUEST)
@action(detail=False, methods=['get'])
def mail_server_accounts(self, request):
"""List all email accounts on the mail server."""
from .mail_server import get_mail_server_service, MailServerError
service = get_mail_server_service()
try:
accounts = service.list_accounts()
return Response({
'success': True,
'accounts': accounts,
'count': len(accounts),
})
except MailServerError as e:
return Response({
'success': False,
'message': str(e),
}, status=status.HTTP_400_BAD_REQUEST)
@action(detail=False, methods=['get'])
def available_domains(self, request):
"""Get available email domains."""
domains = [
{'value': choice[0], 'label': choice[1]}
for choice in PlatformEmailAddress.Domain.choices
]
return Response({
'domains': domains,
})
@action(detail=False, methods=['get'])
def assignable_users(self, request):
"""Get users that can be assigned to email addresses."""
from smoothschedule.users.models import User
users = User.objects.filter(
role__in=['superuser', 'platform_manager', 'platform_support'],
is_active=True
).order_by('first_name', 'last_name', 'email')
user_list = [
{
'id': user.id,
'email': user.email,
'first_name': user.first_name,
'last_name': user.last_name,
'full_name': user.get_full_name() or user.email,
'role': user.role,
}
for user in users
]
return Response({
'users': user_list,
})
@action(detail=True, methods=['post'])
def remove_local(self, request, pk=None):
"""
Remove email address from database only, without deleting from mail server.
Useful for removing an address from the platform while keeping
the mail server account intact.
"""
email_address = self.get_object()
email = email_address.email_address
display_name = email_address.display_name
# Just delete from database, don't touch mail server
email_address.delete()
return Response({
'success': True,
'message': f'Removed {display_name} ({email}) from database. Account still exists on mail server.',
})
@action(detail=False, methods=['post'])
def import_from_mail_server(self, request):
"""
Import existing email accounts from the mail server.
Only imports accounts with supported domains that don't already exist in the database.
"""
from .mail_server import get_mail_server_service, MailServerError
import secrets
service = get_mail_server_service()
try:
accounts = service.list_accounts()
except MailServerError as e:
return Response({
'success': False,
'message': str(e),
}, status=status.HTTP_400_BAD_REQUEST)
# Only import smoothschedule.com addresses
supported_domains = ['smoothschedule.com']
# Get existing email addresses (construct from local_part + domain)
existing_emails = set(
f"{addr.local_part}@{addr.domain}".lower()
for addr in PlatformEmailAddress.objects.only('local_part', 'domain')
)
imported = []
skipped = []
for account in accounts:
email = account.get('email', '')
if not email or '@' not in email:
continue
local_part, domain = email.rsplit('@', 1)
# Skip if domain not supported
if domain not in supported_domains:
skipped.append({
'email': email,
'reason': 'Unsupported domain',
})
continue
# Skip if already exists
if email.lower() in existing_emails:
skipped.append({
'email': email,
'reason': 'Already exists in database',
})
continue
# Create the email address with a placeholder password
# User will need to update the password to sync properly
placeholder_password = secrets.token_urlsafe(16)
try:
email_address = PlatformEmailAddress.objects.create(
display_name=local_part.title().replace('.', ' ').replace('-', ' '),
local_part=local_part,
domain=domain,
password=placeholder_password,
is_active=True,
is_default=False,
mail_server_synced=True, # Already exists on server
)
imported.append({
'id': email_address.id,
'email': email_address.email_address,
'display_name': email_address.display_name,
})
except Exception as e:
skipped.append({
'email': email,
'reason': str(e),
})
return Response({
'success': True,
'imported': imported,
'imported_count': len(imported),
'skipped': skipped,
'skipped_count': len(skipped),
'message': f'Imported {len(imported)} email addresses, skipped {len(skipped)}',
})