Files
smoothschedule/smoothschedule/schedule/views.py
poduck cfc1b36ada feat: Add SMTP settings and collapsible email configuration UI
- Add SMTP fields to TicketEmailSettings model (host, port, TLS/SSL, credentials, from email/name)
- Update serializers with SMTP fields and is_smtp_configured flag
- Add TicketEmailTestSmtpView for testing SMTP connections
- Update frontend API types and hooks for SMTP settings
- Add collapsible IMAP and SMTP configuration sections with "Configured" badges
- Fix TypeScript errors in mockData.ts (missing required fields, type mismatches)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-29 18:28:29 -05:00

1433 lines
51 KiB
Python

"""
Schedule App - DRF ViewSets
API endpoints for Resources and Events with quota enforcement.
"""
from rest_framework import viewsets, status
from rest_framework.permissions import IsAuthenticated, AllowAny
from rest_framework.response import Response
from rest_framework.decorators import action
from django.core.exceptions import ValidationError as DjangoValidationError
from .models import Resource, Event, Participant, ResourceType, ScheduledTask, TaskExecutionLog, PluginTemplate, PluginInstallation, EventPlugin, GlobalEventPlugin, EmailTemplate
from .serializers import (
ResourceSerializer, EventSerializer, ParticipantSerializer,
CustomerSerializer, ServiceSerializer, ResourceTypeSerializer, StaffSerializer,
ScheduledTaskSerializer, TaskExecutionLogSerializer, PluginInfoSerializer,
PluginTemplateSerializer, PluginTemplateListSerializer, PluginInstallationSerializer,
EventPluginSerializer, GlobalEventPluginSerializer,
EmailTemplateSerializer, EmailTemplateListSerializer, EmailTemplatePreviewSerializer
)
from .models import Service
from core.permissions import HasQuota
from smoothschedule.users.models import User
class ResourceTypeViewSet(viewsets.ModelViewSet):
"""
API endpoint for managing custom Resource Types.
Permissions:
- Must be authenticated
- Only owners/managers can create/update/delete
Functionality:
- List all resource types
- Create new custom types
- Update existing types (except is_default flag)
- Delete types (only if not default and not in use)
"""
queryset = ResourceType.objects.all()
serializer_class = ResourceTypeSerializer
permission_classes = [AllowAny] # TODO: Change to IsAuthenticated for production
ordering = ['name']
def destroy(self, request, *args, **kwargs):
"""Override destroy to add validation"""
instance = self.get_object()
# Check if default
if instance.is_default:
return Response(
{'error': 'Cannot delete default resource types.'},
status=status.HTTP_400_BAD_REQUEST
)
# Check if in use
if instance.resources.exists():
return Response(
{
'error': f"Cannot delete resource type '{instance.name}' because it is in use by {instance.resources.count()} resource(s)."
},
status=status.HTTP_400_BAD_REQUEST
)
return super().destroy(request, *args, **kwargs)
class ResourceViewSet(viewsets.ModelViewSet):
"""
API endpoint for managing Resources.
Permissions:
- Must be authenticated
- Subject to MAX_RESOURCES quota (hard block on creation)
The HasQuota permission prevents creating resources when tenant
has reached their subscription tier limit.
"""
queryset = Resource.objects.all()
serializer_class = ResourceSerializer
# TODO: Re-enable authentication for production
permission_classes = [AllowAny] # Temporarily allow unauthenticated access for development
filterset_fields = ['is_active', 'max_concurrent_events']
search_fields = ['name', 'description']
ordering_fields = ['name', 'created_at', 'max_concurrent_events']
ordering = ['name']
def perform_create(self, serializer):
"""Create resource (quota-checked by HasQuota permission)"""
serializer.save()
def perform_update(self, serializer):
"""Update resource"""
serializer.save()
class EventViewSet(viewsets.ModelViewSet):
"""
API endpoint for managing Events.
Permissions:
- Must be authenticated
Validation:
- EventSerializer.validate() automatically checks resource availability
- If resource capacity exceeded, returns 400 Bad Request
- See schedule/services.py AvailabilityService for logic
Query Parameters:
- start_date: Filter events starting on or after this date (ISO format)
- end_date: Filter events starting before this date (ISO format)
- status: Filter by event status
"""
queryset = Event.objects.all()
serializer_class = EventSerializer
# TODO: Re-enable authentication for production
permission_classes = [AllowAny] # Temporarily allow unauthenticated access for development
filterset_fields = ['status']
search_fields = ['title', 'notes']
ordering_fields = ['start_time', 'end_time', 'created_at']
ordering = ['start_time']
def get_queryset(self):
"""
Filter events by date range if start_date and end_date are provided.
"""
queryset = Event.objects.all()
# Filter by date range
start_date = self.request.query_params.get('start_date')
end_date = self.request.query_params.get('end_date')
if start_date:
from django.utils.dateparse import parse_datetime
start_dt = parse_datetime(start_date)
if start_dt:
queryset = queryset.filter(start_time__gte=start_dt)
if end_date:
from django.utils.dateparse import parse_datetime
end_dt = parse_datetime(end_date)
if end_dt:
queryset = queryset.filter(start_time__lt=end_dt)
return queryset
def perform_create(self, serializer):
"""
Create event with automatic availability validation.
The EventSerializer.validate() method calls AvailabilityService
to check if resources have capacity. If not, DRF automatically
returns 400 Bad Request with error details.
"""
# TODO: Re-enable authentication - this is temporary for development
if self.request.user.is_authenticated:
serializer.save(created_by=self.request.user)
else:
serializer.save(created_by=None)
def perform_update(self, serializer):
"""
Update event with availability re-validation.
Uses exclude_event_id in AvailabilityService to allow
rescheduling of the event itself.
"""
serializer.save()
class ParticipantViewSet(viewsets.ModelViewSet):
"""
API endpoint for managing Event Participants.
Allows adding/removing participants (Resources, Staff, Customers)
to/from events via the GenericForeignKey pattern.
"""
queryset = Participant.objects.all()
serializer_class = ParticipantSerializer
permission_classes = [IsAuthenticated]
filterset_fields = ['event', 'role', 'content_type']
ordering_fields = ['created_at']
ordering = ['-created_at']
class CustomerViewSet(viewsets.ModelViewSet):
"""
API endpoint for managing Customers.
Customers are Users with role=CUSTOMER belonging to the current tenant.
"""
serializer_class = CustomerSerializer
# TODO: Re-enable authentication for production
permission_classes = [AllowAny] # Temporarily allow unauthenticated access for development
filterset_fields = ['is_active']
search_fields = ['email', 'first_name', 'last_name']
ordering_fields = ['email', 'created_at']
ordering = ['email']
def get_queryset(self):
"""
Return customers for the current tenant, filtered by sandbox mode.
Customers are Users with role=CUSTOMER.
In sandbox mode, only returns customers with is_sandbox=True.
In live mode, only returns customers with is_sandbox=False.
"""
queryset = User.objects.filter(role=User.Role.CUSTOMER)
# Filter by tenant if user is authenticated and has a tenant
if self.request.user.is_authenticated and self.request.user.tenant:
queryset = queryset.filter(tenant=self.request.user.tenant)
# Filter by sandbox mode - check request.sandbox_mode set by middleware
is_sandbox = getattr(self.request, 'sandbox_mode', False)
queryset = queryset.filter(is_sandbox=is_sandbox)
# Apply status filter if provided
status_filter = self.request.query_params.get('status')
if status_filter:
if status_filter == 'Active':
queryset = queryset.filter(is_active=True)
elif status_filter == 'Inactive':
queryset = queryset.filter(is_active=False)
# Apply search filter if provided
search = self.request.query_params.get('search')
if search:
from django.db.models import Q
queryset = queryset.filter(
Q(email__icontains=search) |
Q(first_name__icontains=search) |
Q(last_name__icontains=search)
)
return queryset
def perform_create(self, serializer):
"""
Set sandbox mode and tenant when creating a new customer.
"""
is_sandbox = getattr(self.request, 'sandbox_mode', False)
tenant = None
if self.request.user.is_authenticated and self.request.user.tenant:
tenant = self.request.user.tenant
serializer.save(
role=User.Role.CUSTOMER,
is_sandbox=is_sandbox,
tenant=tenant,
)
class ServiceViewSet(viewsets.ModelViewSet):
"""
API endpoint for managing Services.
Services are the offerings a business provides (e.g., Haircut, Massage).
"""
queryset = Service.objects.filter(is_active=True)
serializer_class = ServiceSerializer
# TODO: Re-enable authentication for production
permission_classes = [AllowAny] # Temporarily allow unauthenticated access for development
filterset_fields = ['is_active']
search_fields = ['name', 'description']
ordering_fields = ['name', 'price', 'duration', 'display_order', 'created_at']
ordering = ['display_order', 'name']
def get_queryset(self):
"""Return services, optionally including inactive ones."""
queryset = Service.objects.all()
# By default only show active services
show_inactive = self.request.query_params.get('show_inactive', 'false')
if show_inactive.lower() != 'true':
queryset = queryset.filter(is_active=True)
return queryset
@action(detail=False, methods=['post'])
def reorder(self, request):
"""
Bulk update service display order.
Expects: { "order": [1, 3, 2, 5, 4] }
Where the list contains service IDs in the desired display order.
"""
order = request.data.get('order', [])
if not isinstance(order, list):
return Response(
{'error': 'order must be a list of service IDs'},
status=status.HTTP_400_BAD_REQUEST
)
# Update display_order for each service
for index, service_id in enumerate(order):
Service.objects.filter(id=service_id).update(display_order=index)
return Response({'status': 'ok', 'updated': len(order)})
class StaffViewSet(viewsets.ModelViewSet):
"""
API endpoint for managing staff members (Users who can be assigned to resources).
Staff members are Users with roles: TENANT_OWNER, TENANT_MANAGER, TENANT_STAFF.
Supports:
- GET /api/staff/ - List staff members
- GET /api/staff/{id}/ - Get staff member details
- PATCH /api/staff/{id}/ - Update staff member (is_active, permissions)
- POST /api/staff/{id}/toggle_active/ - Toggle active status
"""
serializer_class = StaffSerializer
# TODO: Re-enable authentication for production
permission_classes = [AllowAny]
search_fields = ['email', 'first_name', 'last_name']
ordering_fields = ['email', 'first_name', 'last_name']
ordering = ['first_name', 'last_name']
# Disable create and delete - staff are managed via invitations
# Note: 'post' is needed for custom actions like toggle_active
http_method_names = ['get', 'patch', 'post', 'head', 'options']
def get_queryset(self):
"""
Return staff members for the current tenant, filtered by sandbox mode.
Staff are Users with roles: TENANT_OWNER, TENANT_MANAGER, TENANT_STAFF.
In sandbox mode, only returns staff with is_sandbox=True.
In live mode, only returns staff with is_sandbox=False.
"""
from django.db.models import Q
# Include inactive staff for listing (so admins can reactivate them)
show_inactive = self.request.query_params.get('show_inactive', 'true')
queryset = User.objects.filter(
Q(role=User.Role.TENANT_OWNER) |
Q(role=User.Role.TENANT_MANAGER) |
Q(role=User.Role.TENANT_STAFF)
)
if show_inactive.lower() != 'true':
queryset = queryset.filter(is_active=True)
# Filter by tenant if user is authenticated and has a tenant
# TODO: Re-enable this when authentication is enabled
# if self.request.user.is_authenticated and self.request.user.tenant:
# queryset = queryset.filter(tenant=self.request.user.tenant)
# Filter by sandbox mode - check request.sandbox_mode set by middleware
is_sandbox = getattr(self.request, 'sandbox_mode', False)
queryset = queryset.filter(is_sandbox=is_sandbox)
# Apply search filter if provided
search = self.request.query_params.get('search')
if search:
queryset = queryset.filter(
Q(email__icontains=search) |
Q(first_name__icontains=search) |
Q(last_name__icontains=search)
)
return queryset
def partial_update(self, request, *args, **kwargs):
"""
Update staff member.
Allowed fields: is_active, permissions
Owners can edit any staff member.
Managers can only edit staff (not other managers or owners).
"""
instance = self.get_object()
# TODO: Add permission checks when authentication is enabled
# current_user = request.user
# if current_user.role == User.Role.TENANT_MANAGER:
# if instance.role in [User.Role.TENANT_OWNER, User.Role.TENANT_MANAGER]:
# return Response(
# {'error': 'Managers cannot edit owners or other managers.'},
# status=status.HTTP_403_FORBIDDEN
# )
# Only allow updating specific fields
allowed_fields = {'is_active', 'permissions'}
update_data = {k: v for k, v in request.data.items() if k in allowed_fields}
serializer = self.get_serializer(instance, data=update_data, partial=True)
serializer.is_valid(raise_exception=True)
serializer.save()
return Response(serializer.data)
@action(detail=True, methods=['post'])
def toggle_active(self, request, pk=None):
"""Toggle the active status of a staff member."""
staff = self.get_object()
# Prevent deactivating yourself
# TODO: Enable this check when authentication is enabled
# if request.user.id == staff.id:
# return Response(
# {'error': 'You cannot deactivate your own account.'},
# status=status.HTTP_400_BAD_REQUEST
# )
staff.is_active = not staff.is_active
staff.save(update_fields=['is_active'])
return Response({
'id': staff.id,
'is_active': staff.is_active,
'message': f"Staff member {'activated' if staff.is_active else 'deactivated'} successfully."
})
class ScheduledTaskViewSet(viewsets.ModelViewSet):
"""
API endpoint for managing scheduled tasks.
Permissions:
- Must be authenticated
- Only owners/managers can create/update/delete
Features:
- List all scheduled tasks
- Create new scheduled tasks
- Update existing tasks
- Delete tasks
- Pause/resume tasks
- Trigger manual execution
- View execution logs
"""
queryset = ScheduledTask.objects.all()
serializer_class = ScheduledTaskSerializer
permission_classes = [AllowAny] # TODO: Change to IsAuthenticated for production
ordering = ['-created_at']
def perform_create(self, serializer):
"""Set created_by to current user"""
# TODO: Uncomment when auth is enabled
# serializer.save(created_by=self.request.user)
serializer.save()
@action(detail=True, methods=['post'])
def pause(self, request, pk=None):
"""Pause a scheduled task"""
task = self.get_object()
if task.status == ScheduledTask.Status.PAUSED:
return Response(
{'error': 'Task is already paused'},
status=status.HTTP_400_BAD_REQUEST
)
task.status = ScheduledTask.Status.PAUSED
task.save(update_fields=['status'])
return Response({
'id': task.id,
'status': task.status,
'message': 'Task paused successfully'
})
@action(detail=True, methods=['post'])
def resume(self, request, pk=None):
"""Resume a paused scheduled task"""
task = self.get_object()
if task.status != ScheduledTask.Status.PAUSED:
return Response(
{'error': 'Task is not paused'},
status=status.HTTP_400_BAD_REQUEST
)
task.status = ScheduledTask.Status.ACTIVE
task.update_next_run_time()
task.save(update_fields=['status'])
return Response({
'id': task.id,
'status': task.status,
'next_run_at': task.next_run_at,
'message': 'Task resumed successfully'
})
@action(detail=True, methods=['post'])
def execute(self, request, pk=None):
"""Manually trigger task execution"""
task = self.get_object()
# Import here to avoid circular dependency
from .tasks import execute_scheduled_task
# Queue the task for immediate execution
result = execute_scheduled_task.delay(task.id)
return Response({
'id': task.id,
'celery_task_id': result.id,
'message': 'Task queued for execution'
})
@action(detail=True, methods=['get'])
def logs(self, request, pk=None):
"""Get execution logs for this task"""
task = self.get_object()
# Get pagination parameters
limit = int(request.query_params.get('limit', 20))
offset = int(request.query_params.get('offset', 0))
logs = task.execution_logs.all()[offset:offset + limit]
serializer = TaskExecutionLogSerializer(logs, many=True)
return Response({
'count': task.execution_logs.count(),
'results': serializer.data
})
class TaskExecutionLogViewSet(viewsets.ReadOnlyModelViewSet):
"""
API endpoint for viewing task execution logs (read-only).
Features:
- List all execution logs
- Filter by task, status, date range
- View individual log details
"""
queryset = TaskExecutionLog.objects.select_related('scheduled_task').all()
serializer_class = TaskExecutionLogSerializer
permission_classes = [AllowAny] # TODO: Change to IsAuthenticated for production
ordering = ['-started_at']
def get_queryset(self):
"""Filter logs by query parameters"""
queryset = super().get_queryset()
# Filter by scheduled task
task_id = self.request.query_params.get('task_id')
if task_id:
queryset = queryset.filter(scheduled_task_id=task_id)
# Filter by status
status_filter = self.request.query_params.get('status')
if status_filter:
queryset = queryset.filter(status=status_filter)
return queryset
class PluginViewSet(viewsets.ViewSet):
"""
API endpoint for listing available plugins.
Features:
- List all registered plugins
- Get plugin details
- List plugins by category
"""
permission_classes = [AllowAny] # TODO: Change to IsAuthenticated for production
def list(self, request):
"""List all available plugins"""
from .plugins import registry
plugins = registry.list_all()
serializer = PluginInfoSerializer(plugins, many=True)
return Response(serializer.data)
@action(detail=False, methods=['get'])
def by_category(self, request):
"""List plugins grouped by category"""
from .plugins import registry
plugins_by_category = registry.list_by_category()
return Response(plugins_by_category)
def retrieve(self, request, pk=None):
"""Get details for a specific plugin"""
from .plugins import registry
plugin_class = registry.get(pk)
if not plugin_class:
return Response(
{'error': f"Plugin '{pk}' not found"},
status=status.HTTP_404_NOT_FOUND
)
plugin_info = {
'name': plugin_class.name,
'display_name': plugin_class.display_name,
'description': plugin_class.description,
'category': plugin_class.category,
'config_schema': plugin_class.config_schema,
}
serializer = PluginInfoSerializer(plugin_info)
return Response(serializer.data)
class PluginTemplateViewSet(viewsets.ModelViewSet):
"""
API endpoint for managing plugin templates.
Features:
- List all plugin templates (filtered by visibility)
- Create new plugin templates
- Update existing templates
- Delete templates
- Publish to marketplace
- Unpublish from marketplace
- Install a template as a ScheduledTask
- Request approval (for marketplace publishing)
- Approve/reject templates (platform admins only)
"""
queryset = PluginTemplate.objects.all()
serializer_class = PluginTemplateSerializer
permission_classes = [AllowAny] # TODO: Change to IsAuthenticated for production
ordering = ['-created_at']
filterset_fields = ['visibility', 'category', 'is_approved']
search_fields = ['name', 'short_description', 'description', 'tags']
def get_queryset(self):
"""
Filter templates based on user permissions.
- Marketplace view: Only approved PUBLIC templates
- My Plugins: User's own templates (all visibilities)
- Platform admins: All templates
"""
queryset = super().get_queryset()
view_mode = self.request.query_params.get('view', 'marketplace')
if view_mode == 'marketplace':
# Public marketplace - platform official + approved public templates
from django.db.models import Q
queryset = queryset.filter(
Q(visibility=PluginTemplate.Visibility.PLATFORM) |
Q(visibility=PluginTemplate.Visibility.PUBLIC, is_approved=True)
)
elif view_mode == 'my_plugins':
# User's own templates
if self.request.user.is_authenticated:
queryset = queryset.filter(author=self.request.user)
else:
queryset = queryset.none()
elif view_mode == 'platform':
# Platform official plugins
queryset = queryset.filter(visibility=PluginTemplate.Visibility.PLATFORM)
# else: all templates (for platform admins)
# Filter by category if provided
category = self.request.query_params.get('category')
if category:
queryset = queryset.filter(category=category)
# Filter by search query
search = self.request.query_params.get('search')
if search:
from django.db.models import Q
queryset = queryset.filter(
Q(name__icontains=search) |
Q(short_description__icontains=search) |
Q(description__icontains=search) |
Q(tags__icontains=search)
)
return queryset
def get_serializer_class(self):
"""Use lightweight serializer for list view"""
if self.action == 'list':
return PluginTemplateListSerializer
return PluginTemplateSerializer
def perform_create(self, serializer):
"""Set author and extract template variables on create"""
from .template_parser import TemplateVariableParser
plugin_code = serializer.validated_data.get('plugin_code', '')
template_vars = TemplateVariableParser.extract_variables(plugin_code)
# Convert to dict format expected by model
template_vars_dict = {var['name']: var for var in template_vars}
serializer.save(
author=self.request.user if self.request.user.is_authenticated else None,
template_variables=template_vars_dict
)
@action(detail=True, methods=['post'])
def publish(self, request, pk=None):
"""Publish template to marketplace (requires approval)"""
template = self.get_object()
# Check ownership
if template.author != request.user:
return Response(
{'error': 'You can only publish your own templates'},
status=status.HTTP_403_FORBIDDEN
)
# Check if approved
if not template.is_approved:
return Response(
{'error': 'Template must be approved before publishing to marketplace'},
status=status.HTTP_400_BAD_REQUEST
)
# Publish
try:
template.publish_to_marketplace(request.user)
return Response({
'message': 'Template published to marketplace successfully',
'slug': template.slug
})
except DjangoValidationError as e:
return Response({'error': str(e)}, status=status.HTTP_400_BAD_REQUEST)
@action(detail=True, methods=['post'])
def unpublish(self, request, pk=None):
"""Unpublish template from marketplace"""
template = self.get_object()
# Check ownership
if template.author != request.user:
return Response(
{'error': 'You can only unpublish your own templates'},
status=status.HTTP_403_FORBIDDEN
)
template.unpublish_from_marketplace()
return Response({
'message': 'Template unpublished from marketplace successfully'
})
@action(detail=True, methods=['post'])
def install(self, request, pk=None):
"""
Install a plugin template as a ScheduledTask.
Expects:
{
"name": "Task Name",
"description": "Task Description",
"config_values": {"variable1": "value1", ...},
"schedule_type": "CRON",
"cron_expression": "0 0 * * *"
}
"""
template = self.get_object()
# Check if template is accessible
if template.visibility == PluginTemplate.Visibility.PRIVATE:
if not request.user.is_authenticated or template.author != request.user:
return Response(
{'error': 'This template is private'},
status=status.HTTP_403_FORBIDDEN
)
elif template.visibility == PluginTemplate.Visibility.PUBLIC:
if not template.is_approved:
return Response(
{'error': 'This template has not been approved'},
status=status.HTTP_400_BAD_REQUEST
)
# Create ScheduledTask from template
from .template_parser import TemplateVariableParser
name = request.data.get('name')
description = request.data.get('description', '')
config_values = request.data.get('config_values', {})
schedule_type = request.data.get('schedule_type')
cron_expression = request.data.get('cron_expression')
interval_minutes = request.data.get('interval_minutes')
run_at = request.data.get('run_at')
if not name:
return Response(
{'error': 'name is required'},
status=status.HTTP_400_BAD_REQUEST
)
# Compile template with config values
try:
compiled_code = TemplateVariableParser.compile_template(
template.plugin_code,
config_values,
context={} # TODO: Add business context
)
except ValueError as e:
return Response(
{'error': f'Configuration error: {str(e)}'},
status=status.HTTP_400_BAD_REQUEST
)
# Create ScheduledTask
scheduled_task = ScheduledTask.objects.create(
name=name,
description=description,
plugin_name='custom_script', # Use custom script plugin
plugin_code=compiled_code,
plugin_config={},
schedule_type=schedule_type,
cron_expression=cron_expression,
interval_minutes=interval_minutes,
run_at=run_at,
status=ScheduledTask.Status.ACTIVE,
created_by=request.user if request.user.is_authenticated else None
)
# Create PluginInstallation record
installation = PluginInstallation.objects.create(
template=template,
scheduled_task=scheduled_task,
installed_by=request.user if request.user.is_authenticated else None,
config_values=config_values,
template_version_hash=template.plugin_code_hash
)
# Increment install count
template.install_count += 1
template.save(update_fields=['install_count'])
return Response({
'message': 'Plugin installed successfully',
'scheduled_task_id': scheduled_task.id,
'installation_id': installation.id
}, status=status.HTTP_201_CREATED)
@action(detail=True, methods=['post'])
def request_approval(self, request, pk=None):
"""Request approval for marketplace publishing"""
template = self.get_object()
# Check ownership
if template.author != request.user:
return Response(
{'error': 'You can only request approval for your own templates'},
status=status.HTTP_403_FORBIDDEN
)
# Check if already approved or pending
if template.is_approved:
return Response(
{'error': 'Template is already approved'},
status=status.HTTP_400_BAD_REQUEST
)
# Validate plugin code
validation = template.can_be_published()
if not validation:
from .safe_scripting import validate_plugin_whitelist
errors = validate_plugin_whitelist(template.plugin_code)
return Response(
{'error': 'Template has validation errors', 'errors': errors['errors']},
status=status.HTTP_400_BAD_REQUEST
)
# TODO: Notify platform admins about approval request
# For now, just return success
return Response({
'message': 'Approval requested successfully. A platform administrator will review your plugin.',
'template_id': template.id
})
@action(detail=True, methods=['post'])
def approve(self, request, pk=None):
"""Approve template for marketplace (platform admins only)"""
# TODO: Add permission check for platform admins
# if not request.user.has_perm('can_approve_plugins'):
# return Response({'error': 'Permission denied'}, status=status.HTTP_403_FORBIDDEN)
template = self.get_object()
if template.is_approved:
return Response(
{'error': 'Template is already approved'},
status=status.HTTP_400_BAD_REQUEST
)
# Validate plugin code
from .safe_scripting import validate_plugin_whitelist
validation = validate_plugin_whitelist(template.plugin_code, scheduled_task=None)
if not validation['valid']:
return Response(
{'error': 'Template has validation errors', 'errors': validation['errors']},
status=status.HTTP_400_BAD_REQUEST
)
# Approve
from django.utils import timezone
template.is_approved = True
template.approved_by = request.user if request.user.is_authenticated else None
template.approved_at = timezone.now()
template.rejection_reason = ''
template.save()
return Response({
'message': 'Template approved successfully',
'template_id': template.id
})
@action(detail=True, methods=['post'])
def reject(self, request, pk=None):
"""Reject template for marketplace (platform admins only)"""
# TODO: Add permission check for platform admins
# if not request.user.has_perm('can_approve_plugins'):
# return Response({'error': 'Permission denied'}, status=status.HTTP_403_FORBIDDEN)
template = self.get_object()
reason = request.data.get('reason', 'No reason provided')
template.is_approved = False
template.rejection_reason = reason
template.save()
return Response({
'message': 'Template rejected',
'reason': reason
})
class PluginInstallationViewSet(viewsets.ModelViewSet):
"""
API endpoint for managing plugin installations.
Features:
- List user's installed plugins
- View installation details
- Update installation (update to latest version)
- Uninstall plugin
- Rate and review plugin
"""
queryset = PluginInstallation.objects.select_related('template', 'scheduled_task').all()
serializer_class = PluginInstallationSerializer
permission_classes = [AllowAny] # TODO: Change to IsAuthenticated for production
ordering = ['-installed_at']
def get_queryset(self):
"""Return installations for current user/tenant"""
queryset = super().get_queryset()
# TODO: Filter by tenant when multi-tenancy is fully enabled
# if self.request.user.is_authenticated and self.request.user.tenant:
# queryset = queryset.filter(scheduled_task__tenant=self.request.user.tenant)
return queryset
@action(detail=True, methods=['post'])
def update_to_latest(self, request, pk=None):
"""Update installed plugin to latest template version"""
installation = self.get_object()
if not installation.has_update_available():
return Response(
{'error': 'No update available'},
status=status.HTTP_400_BAD_REQUEST
)
try:
installation.update_to_latest()
return Response({
'message': 'Plugin updated successfully',
'new_version_hash': installation.template_version_hash
})
except DjangoValidationError as e:
return Response({'error': str(e)}, status=status.HTTP_400_BAD_REQUEST)
@action(detail=True, methods=['post'])
def rate(self, request, pk=None):
"""Rate an installed plugin"""
installation = self.get_object()
rating = request.data.get('rating')
review = request.data.get('review', '')
if not rating or not isinstance(rating, int) or rating < 1 or rating > 5:
return Response(
{'error': 'Rating must be an integer between 1 and 5'},
status=status.HTTP_400_BAD_REQUEST
)
# Update installation
from django.utils import timezone
installation.rating = rating
installation.review = review
installation.reviewed_at = timezone.now()
installation.save()
# Update template average rating
if installation.template:
template = installation.template
ratings = PluginInstallation.objects.filter(
template=template,
rating__isnull=False
).values_list('rating', flat=True)
if ratings:
from decimal import Decimal
template.rating_average = Decimal(sum(ratings)) / Decimal(len(ratings))
template.rating_count = len(ratings)
template.save(update_fields=['rating_average', 'rating_count'])
return Response({
'message': 'Rating submitted successfully',
'rating': rating
})
def destroy(self, request, *args, **kwargs):
"""Uninstall plugin (delete ScheduledTask and Installation)"""
installation = self.get_object()
# Delete the scheduled task (this will cascade delete the installation)
if installation.scheduled_task:
installation.scheduled_task.delete()
else:
# If scheduled task was already deleted, just delete the installation
installation.delete()
return Response({
'message': 'Plugin uninstalled successfully'
}, status=status.HTTP_204_NO_CONTENT)
class EventPluginViewSet(viewsets.ModelViewSet):
"""
API endpoint for managing plugins attached to calendar events.
This allows users to attach installed plugins to events with configurable
timing triggers (before start, at start, after end, on complete, etc.)
Endpoints:
- GET /api/event-plugins/?event_id=X - List plugins for an event
- POST /api/event-plugins/ - Attach plugin to event
- PATCH /api/event-plugins/{id}/ - Update timing/trigger
- DELETE /api/event-plugins/{id}/ - Remove plugin from event
- POST /api/event-plugins/{id}/toggle/ - Enable/disable plugin
"""
queryset = EventPlugin.objects.select_related(
'event',
'plugin_installation',
'plugin_installation__template'
).all()
serializer_class = EventPluginSerializer
permission_classes = [AllowAny] # TODO: Change to IsAuthenticated
def get_queryset(self):
"""Filter by event if specified"""
queryset = super().get_queryset()
event_id = self.request.query_params.get('event_id')
if event_id:
queryset = queryset.filter(event_id=event_id)
return queryset.order_by('execution_order', 'created_at')
def list(self, request):
"""
List event plugins.
Query params:
- event_id: Filter by event (required for listing)
"""
event_id = request.query_params.get('event_id')
if not event_id:
return Response({
'error': 'event_id query parameter is required'
}, status=status.HTTP_400_BAD_REQUEST)
queryset = self.get_queryset()
serializer = self.get_serializer(queryset, many=True)
return Response(serializer.data)
@action(detail=True, methods=['post'])
def toggle(self, request, pk=None):
"""Toggle is_active status of an event plugin"""
event_plugin = self.get_object()
event_plugin.is_active = not event_plugin.is_active
event_plugin.save(update_fields=['is_active'])
serializer = self.get_serializer(event_plugin)
return Response(serializer.data)
@action(detail=False, methods=['get'])
def triggers(self, request):
"""
Get available trigger options for the UI.
Returns trigger choices with human-readable labels and
common offset presets.
"""
return Response({
'triggers': [
{'value': choice[0], 'label': choice[1]}
for choice in EventPlugin.Trigger.choices
],
'offset_presets': [
{'value': 0, 'label': 'Immediately'},
{'value': 5, 'label': '5 minutes'},
{'value': 10, 'label': '10 minutes'},
{'value': 15, 'label': '15 minutes'},
{'value': 30, 'label': '30 minutes'},
{'value': 60, 'label': '1 hour'},
{'value': 120, 'label': '2 hours'},
{'value': 1440, 'label': '1 day'},
],
'timing_groups': [
{
'label': 'Before Event',
'triggers': ['before_start'],
'supports_offset': True,
},
{
'label': 'During Event',
'triggers': ['at_start', 'after_start'],
'supports_offset': True,
},
{
'label': 'After Event',
'triggers': ['after_end'],
'supports_offset': True,
},
{
'label': 'Status Changes',
'triggers': ['on_complete', 'on_cancel'],
'supports_offset': False,
},
]
})
class GlobalEventPluginViewSet(viewsets.ModelViewSet):
"""
API endpoint for managing global event plugin rules.
Global event plugins automatically attach to ALL events - both existing
events and new events as they are created.
Use this for automation rules that should apply across the board, such as:
- Sending confirmation emails for all appointments
- Logging all event completions
- Running cleanup after every event
Endpoints:
- GET /api/global-event-plugins/ - List all global rules
- POST /api/global-event-plugins/ - Create rule (auto-applies to existing events)
- GET /api/global-event-plugins/{id}/ - Get rule details
- PATCH /api/global-event-plugins/{id}/ - Update rule
- DELETE /api/global-event-plugins/{id}/ - Delete rule
- POST /api/global-event-plugins/{id}/toggle/ - Enable/disable rule
- POST /api/global-event-plugins/{id}/reapply/ - Reapply to all events
"""
queryset = GlobalEventPlugin.objects.select_related(
'plugin_installation',
'plugin_installation__template',
'created_by'
).all()
serializer_class = GlobalEventPluginSerializer
permission_classes = [AllowAny] # TODO: Change to IsAuthenticated
def get_queryset(self):
"""Optionally filter by active status"""
queryset = super().get_queryset()
is_active = self.request.query_params.get('is_active')
if is_active is not None:
queryset = queryset.filter(is_active=is_active.lower() == 'true')
return queryset.order_by('execution_order', 'created_at')
def perform_create(self, serializer):
"""Set created_by on creation"""
user = self.request.user if self.request.user.is_authenticated else None
serializer.save(created_by=user)
@action(detail=True, methods=['post'])
def toggle(self, request, pk=None):
"""Toggle is_active status of a global event plugin rule"""
global_plugin = self.get_object()
global_plugin.is_active = not global_plugin.is_active
global_plugin.save(update_fields=['is_active', 'updated_at'])
serializer = self.get_serializer(global_plugin)
return Response(serializer.data)
@action(detail=True, methods=['post'])
def reapply(self, request, pk=None):
"""
Reapply this global rule to all events.
Useful if:
- Events were created while the rule was inactive
- Plugin attachments were manually removed
"""
global_plugin = self.get_object()
if not global_plugin.is_active:
return Response({
'error': 'Cannot reapply inactive rule. Enable it first.'
}, status=status.HTTP_400_BAD_REQUEST)
count = global_plugin.apply_to_all_events()
return Response({
'message': f'Applied to {count} events',
'events_affected': count
})
@action(detail=False, methods=['get'])
def triggers(self, request):
"""
Get available trigger options for the UI.
Returns trigger choices with human-readable labels and
common offset presets (same as EventPlugin).
"""
return Response({
'triggers': [
{'value': choice[0], 'label': choice[1]}
for choice in EventPlugin.Trigger.choices
],
'offset_presets': [
{'value': 0, 'label': 'Immediately'},
{'value': 5, 'label': '5 minutes'},
{'value': 10, 'label': '10 minutes'},
{'value': 15, 'label': '15 minutes'},
{'value': 30, 'label': '30 minutes'},
{'value': 60, 'label': '1 hour'},
],
})
class EmailTemplateViewSet(viewsets.ModelViewSet):
"""
API endpoint for managing email templates.
Email templates can be used by plugins to send customized emails.
Templates support variable substitution for dynamic content.
Access Control:
- Business users see only BUSINESS scope templates (their own tenant's)
- Platform users can also see/create PLATFORM scope templates (shared)
Endpoints:
- GET /api/email-templates/ - List templates (filtered by scope/category)
- POST /api/email-templates/ - Create template
- GET /api/email-templates/{id}/ - Get template details
- PATCH /api/email-templates/{id}/ - Update template
- DELETE /api/email-templates/{id}/ - Delete template
- POST /api/email-templates/preview/ - Render preview with sample data
- POST /api/email-templates/{id}/duplicate/ - Create a copy
- GET /api/email-templates/variables/ - Get available template variables
"""
queryset = EmailTemplate.objects.all()
serializer_class = EmailTemplateSerializer
permission_classes = [IsAuthenticated]
def get_queryset(self):
"""Filter templates based on user type and query params"""
user = self.request.user
queryset = super().get_queryset()
# Platform users see all templates
if hasattr(user, 'is_platform_user') and user.is_platform_user:
scope = self.request.query_params.get('scope')
if scope:
queryset = queryset.filter(scope=scope.upper())
else:
# Business users only see BUSINESS scope templates
queryset = queryset.filter(scope=EmailTemplate.Scope.BUSINESS)
# Filter by category if specified
category = self.request.query_params.get('category')
if category:
queryset = queryset.filter(category=category.upper())
return queryset.order_by('name')
def get_serializer_class(self):
"""Use lightweight serializer for list view"""
if self.action == 'list':
return EmailTemplateListSerializer
return EmailTemplateSerializer
def perform_create(self, serializer):
"""Set created_by from request user"""
serializer.save(created_by=self.request.user)
@action(detail=False, methods=['post'])
def preview(self, request):
"""
Render a preview of the template with sample data.
Request body:
{
"subject": "Hello {{CUSTOMER_NAME}}",
"html_content": "<p>Your appointment is on {{APPOINTMENT_DATE}}</p>",
"text_content": "Your appointment is on {{APPOINTMENT_DATE}}",
"context": {"CUSTOMER_NAME": "John"} // optional overrides
}
Response includes rendered content with force_footer flag for free tier.
"""
serializer = EmailTemplatePreviewSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
from .template_parser import TemplateVariableParser
from datetime import datetime
context = serializer.validated_data.get('context', {})
subject = serializer.validated_data['subject']
html = serializer.validated_data.get('html_content', '')
text = serializer.validated_data.get('text_content', '')
# Add default sample values for preview
default_context = {
'BUSINESS_NAME': 'Demo Business',
'BUSINESS_EMAIL': 'contact@demo.com',
'BUSINESS_PHONE': '(555) 123-4567',
'CUSTOMER_NAME': 'John Doe',
'CUSTOMER_EMAIL': 'john@example.com',
'APPOINTMENT_TIME': 'Monday, January 15, 2025 at 2:00 PM',
'APPOINTMENT_DATE': 'January 15, 2025',
'APPOINTMENT_SERVICE': 'Consultation',
'TODAY': datetime.now().strftime('%B %d, %Y'),
'NOW': datetime.now().strftime('%B %d, %Y at %I:%M %p'),
}
default_context.update(context)
rendered_subject = TemplateVariableParser.replace_insertion_codes(subject, default_context)
rendered_html = TemplateVariableParser.replace_insertion_codes(html, default_context) if html else ''
rendered_text = TemplateVariableParser.replace_insertion_codes(text, default_context) if text else ''
# Check if free tier - append footer
force_footer = False
user = request.user
if hasattr(user, 'is_platform_user') and not user.is_platform_user:
from django.db import connection
if hasattr(connection, 'tenant') and connection.tenant.subscription_tier == 'FREE':
force_footer = True
if force_footer:
# Create a temporary instance just to use the footer methods
temp = EmailTemplate()
rendered_html = temp._append_html_footer(rendered_html)
rendered_text = temp._append_text_footer(rendered_text)
return Response({
'subject': rendered_subject,
'html_content': rendered_html,
'text_content': rendered_text,
'force_footer': force_footer,
})
@action(detail=True, methods=['post'])
def duplicate(self, request, pk=None):
"""
Create a copy of an existing template.
The copy will have "(Copy)" appended to its name.
"""
template = self.get_object()
new_template = EmailTemplate.objects.create(
name=f"{template.name} (Copy)",
description=template.description,
subject=template.subject,
html_content=template.html_content,
text_content=template.text_content,
scope=template.scope,
category=template.category,
preview_context=template.preview_context,
created_by=request.user,
)
serializer = EmailTemplateSerializer(new_template)
return Response(serializer.data, status=status.HTTP_201_CREATED)
@action(detail=False, methods=['get'])
def variables(self, request):
"""
Get available template variables for the email template editor.
Returns variables grouped by category with descriptions.
"""
return Response({
'variables': [
{
'category': 'Business',
'items': [
{'code': '{{BUSINESS_NAME}}', 'description': 'Business name'},
{'code': '{{BUSINESS_EMAIL}}', 'description': 'Business contact email'},
{'code': '{{BUSINESS_PHONE}}', 'description': 'Business phone number'},
]
},
{
'category': 'Customer',
'items': [
{'code': '{{CUSTOMER_NAME}}', 'description': 'Customer full name'},
{'code': '{{CUSTOMER_EMAIL}}', 'description': 'Customer email address'},
]
},
{
'category': 'Appointment',
'items': [
{'code': '{{APPOINTMENT_TIME}}', 'description': 'Full date and time'},
{'code': '{{APPOINTMENT_DATE}}', 'description': 'Date only'},
{'code': '{{APPOINTMENT_SERVICE}}', 'description': 'Service name'},
]
},
{
'category': 'Date/Time',
'items': [
{'code': '{{TODAY}}', 'description': 'Current date'},
{'code': '{{NOW}}', 'description': 'Current date and time'},
]
},
],
'categories': [choice[0] for choice in EmailTemplate.Category.choices],
})