From e9b3eb9e841f41ef21540776a60fba6ee6b46d86 Mon Sep 17 00:00:00 2001 From: poduck Date: Fri, 28 Nov 2025 21:00:39 -0500 Subject: [PATCH] feat: Add HTTP methods and URL whitelist system for plugins MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Backend Changes: - Extended SafeScriptAPI to support all HTTP methods (GET, POST, PUT, PATCH, DELETE) - Created WhitelistedURL model for per-plugin and platform-wide URL whitelisting - Added _validate_url() method with SSRF protection and private IP blocking - Updated SafeScriptAPI to accept scheduled_task parameter for whitelist checking - All HTTP methods now validate against whitelist before making requests WhitelistedURL Model: - Supports two scopes: PLATFORM (all plugins) and PLUGIN (specific plugin) - Stores URL patterns with wildcard support (e.g., https://api.example.com/*) - Tracks allowed HTTP methods per URL - Includes approval workflow (approved_by, approved_at) - Stores original plugin code for verification - Domain-based indexing for fast lookup - Database constraint ensures platform-wide entries have no plugin assigned Security Features: - SSRF prevention: blocks localhost, loopback, and private IP ranges - Per-plugin whitelist: each ScheduledTask can only access its whitelisted URLs - Platform-wide whitelist: approved URLs accessible by all plugins - HTTP method validation: URLs must explicitly allow each method - URL pattern matching with wildcard support Related Models: - WhitelistedURL.scheduled_task -> ScheduledTask (plugin that owns the whitelist) - WhitelistedURL.approved_by -> User (platform user who approved the URL) Migration: schedule/migrations/0014_whitelistedurl.py 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- .../migrations/0014_whitelistedurl.py | 39 ++++ smoothschedule/schedule/models.py | 175 ++++++++++++++ smoothschedule/schedule/safe_scripting.py | 221 ++++++++++++++++-- 3 files changed, 412 insertions(+), 23 deletions(-) create mode 100644 smoothschedule/schedule/migrations/0014_whitelistedurl.py diff --git a/smoothschedule/schedule/migrations/0014_whitelistedurl.py b/smoothschedule/schedule/migrations/0014_whitelistedurl.py new file mode 100644 index 0000000..2dfeb74 --- /dev/null +++ b/smoothschedule/schedule/migrations/0014_whitelistedurl.py @@ -0,0 +1,39 @@ +# Generated by Django 5.2.8 on 2025-11-29 01:59 + +import django.db.models.deletion +from django.conf import settings +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('schedule', '0013_scheduledtask_taskexecutionlog_and_more'), + migrations.swappable_dependency(settings.AUTH_USER_MODEL), + ] + + operations = [ + migrations.CreateModel( + name='WhitelistedURL', + fields=[ + ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('url_pattern', models.CharField(help_text='URL or URL pattern (e.g., https://api.example.com/v1/* or https://hooks.slack.com/*)', max_length=500)), + ('domain', models.CharField(db_index=True, help_text='Extracted domain for quick lookup (e.g., api.example.com)', max_length=255)), + ('scope', models.CharField(choices=[('PLATFORM', 'Platform-wide (all plugins)'), ('PLUGIN', 'Plugin-specific')], db_index=True, default='PLUGIN', max_length=10)), + ('allowed_methods', models.JSONField(default=list, help_text="List of allowed HTTP methods: ['GET', 'POST', 'PUT', 'PATCH', 'DELETE']")), + ('description', models.TextField(help_text="Why this URL is whitelisted and what it's used for")), + ('approved_at', models.DateTimeField(blank=True, null=True)), + ('is_active', models.BooleanField(db_index=True, default=True, help_text='Whether this whitelist entry is currently active')), + ('created_at', models.DateTimeField(auto_now_add=True)), + ('updated_at', models.DateTimeField(auto_now=True)), + ('original_plugin_code', models.TextField(blank=True, help_text='Original plugin code submitted for approval (for verification)')), + ('approved_by', models.ForeignKey(blank=True, help_text='Platform user who approved this whitelist entry', null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='approved_whitelisted_urls', to=settings.AUTH_USER_MODEL)), + ('scheduled_task', models.ForeignKey(blank=True, help_text='Scheduled task (plugin) that owns this whitelist entry (null for platform-wide)', null=True, on_delete=django.db.models.deletion.CASCADE, related_name='whitelisted_urls', to='schedule.scheduledtask')), + ], + options={ + 'ordering': ['-created_at'], + 'indexes': [models.Index(fields=['domain', 'is_active'], name='schedule_wh_domain_796c4c_idx'), models.Index(fields=['scope', 'is_active'], name='schedule_wh_scope_a0bce0_idx'), models.Index(fields=['scheduled_task', 'is_active'], name='schedule_wh_schedul_7c025c_idx')], + 'constraints': [models.CheckConstraint(condition=models.Q(models.Q(('scheduled_task__isnull', True), ('scope', 'PLATFORM')), models.Q(('scheduled_task__isnull', False), ('scope', 'PLUGIN')), _connector='OR'), name='platform_scope_no_task')], + }, + ), + ] diff --git a/smoothschedule/schedule/models.py b/smoothschedule/schedule/models.py index 4e10f25..a9b98cc 100644 --- a/smoothschedule/schedule/models.py +++ b/smoothschedule/schedule/models.py @@ -431,3 +431,178 @@ class TaskExecutionLog(models.Model): def __str__(self): return f"{self.scheduled_task.name} - {self.status} at {self.started_at}" + + +class WhitelistedURL(models.Model): + """ + URL whitelist for plugin HTTP access. + + Supports two scopes: + - Platform-wide: accessible by all plugins (approved_by platform user with can_whitelist_urls permission) + - Plugin-specific: accessible only by specific plugin + """ + + class Scope(models.TextChoices): + PLATFORM = 'PLATFORM', 'Platform-wide (all plugins)' + PLUGIN = 'PLUGIN', 'Plugin-specific' + + # URL Configuration + url_pattern = models.CharField( + max_length=500, + help_text="URL or URL pattern (e.g., https://api.example.com/v1/* or https://hooks.slack.com/*)" + ) + + domain = models.CharField( + max_length=255, + db_index=True, + help_text="Extracted domain for quick lookup (e.g., api.example.com)" + ) + + # Scope & Ownership + scope = models.CharField( + max_length=10, + choices=Scope.choices, + default=Scope.PLUGIN, + db_index=True + ) + + scheduled_task = models.ForeignKey( + 'ScheduledTask', + on_delete=models.CASCADE, + null=True, + blank=True, + related_name='whitelisted_urls', + help_text="Scheduled task (plugin) that owns this whitelist entry (null for platform-wide)" + ) + + # HTTP Methods + allowed_methods = models.JSONField( + default=list, + help_text="List of allowed HTTP methods: ['GET', 'POST', 'PUT', 'PATCH', 'DELETE']" + ) + + # Metadata + description = models.TextField( + help_text="Why this URL is whitelisted and what it's used for" + ) + + approved_by = models.ForeignKey( + 'users.User', + on_delete=models.SET_NULL, + null=True, + blank=True, + related_name='approved_whitelisted_urls', + help_text="Platform user who approved this whitelist entry" + ) + + approved_at = models.DateTimeField(null=True, blank=True) + + is_active = models.BooleanField( + default=True, + db_index=True, + help_text="Whether this whitelist entry is currently active" + ) + + created_at = models.DateTimeField(auto_now_add=True) + updated_at = models.DateTimeField(auto_now=True) + + # Security metadata + original_plugin_code = models.TextField( + blank=True, + help_text="Original plugin code submitted for approval (for verification)" + ) + + class Meta: + ordering = ['-created_at'] + indexes = [ + models.Index(fields=['domain', 'is_active']), + models.Index(fields=['scope', 'is_active']), + models.Index(fields=['scheduled_task', 'is_active']), + ] + constraints = [ + models.CheckConstraint( + check=models.Q(scope='PLATFORM', scheduled_task__isnull=True) | models.Q(scope='PLUGIN', scheduled_task__isnull=False), + name='platform_scope_no_task' + ) + ] + + def __str__(self): + scope_label = f"{self.get_scope_display()}" + methods = ', '.join(self.allowed_methods) if self.allowed_methods else 'No methods' + return f"{self.url_pattern} ({scope_label}) - {methods}" + + def save(self, *args, **kwargs): + """Extract domain from URL pattern""" + from urllib.parse import urlparse + + # Extract domain from URL pattern + if not self.domain: + # Remove wildcard for parsing + url_for_parsing = self.url_pattern.replace('*', '') + parsed = urlparse(url_for_parsing) + self.domain = parsed.hostname or '' + + super().save(*args, **kwargs) + + def matches_url(self, url: str) -> bool: + """ + Check if this whitelist entry matches the given URL. + + Supports wildcard patterns: + - https://api.example.com/* matches all paths under / + - https://api.example.com/v1/* matches all paths under /v1/ + """ + # Simple implementation: check if URL starts with pattern (minus wildcard) + pattern = self.url_pattern.replace('*', '') + return url.startswith(pattern) + + def allows_method(self, method: str) -> bool: + """Check if this whitelist entry allows the given HTTP method""" + return method.upper() in [m.upper() for m in self.allowed_methods] + + @classmethod + def is_url_whitelisted(cls, url: str, method: str, scheduled_task=None) -> bool: + """ + Check if a URL and HTTP method combination is whitelisted. + + Args: + url: The URL to check + method: HTTP method (GET, POST, etc.) + scheduled_task: Optional ScheduledTask (plugin) to check task-specific whitelist + + Returns: + True if URL is whitelisted for the given method + """ + from urllib.parse import urlparse + + parsed = urlparse(url) + domain = parsed.hostname + + if not domain: + return False + + # Check platform-wide whitelist + platform_entries = cls.objects.filter( + domain=domain, + scope=cls.Scope.PLATFORM, + is_active=True + ) + + for entry in platform_entries: + if entry.matches_url(url) and entry.allows_method(method): + return True + + # Check task-specific whitelist if scheduled_task provided + if scheduled_task: + task_entries = cls.objects.filter( + domain=domain, + scope=cls.Scope.PLUGIN, + scheduled_task=scheduled_task, + is_active=True + ) + + for entry in task_entries: + if entry.matches_url(url) and entry.allows_method(method): + return True + + return False diff --git a/smoothschedule/schedule/safe_scripting.py b/smoothschedule/schedule/safe_scripting.py index af83db5..e9a2306 100644 --- a/smoothschedule/schedule/safe_scripting.py +++ b/smoothschedule/schedule/safe_scripting.py @@ -40,10 +40,11 @@ class SafeScriptAPI: Only exposes whitelisted operations that interact with their own data. """ - def __init__(self, business, user, execution_context): + def __init__(self, business, user, execution_context, scheduled_task=None): self.business = business self.user = user self.context = execution_context + self.scheduled_task = scheduled_task # ScheduledTask instance for whitelist checking self._api_call_count = 0 self._max_api_calls = 50 # Prevent API spam @@ -190,40 +191,61 @@ class SafeScriptAPI: logger.info(f"[Customer Script] {message}") return message + def _validate_url(self, url, method='GET'): + """ + Validate URL against whitelist and check for SSRF attacks. + + Args: + url: URL to validate + method: HTTP method (GET, POST, PUT, PATCH, DELETE) + + Raises: + ScriptExecutionError: If URL is not whitelisted or is unsafe + """ + from urllib.parse import urlparse + from .models import WhitelistedURL + + parsed = urlparse(url) + + # Prevent SSRF attacks - check localhost + if parsed.hostname in ['localhost', '127.0.0.1', '0.0.0.0', '::1']: + raise ScriptExecutionError("Cannot access localhost") + + # Prevent access to private IP ranges + import ipaddress + try: + ip = ipaddress.ip_address(parsed.hostname) + if ip.is_private or ip.is_loopback or ip.is_link_local: + raise ScriptExecutionError("Cannot access private IP addresses") + except ValueError: + # Not an IP address, continue with domain validation + pass + + # Check whitelist using database model + if not WhitelistedURL.is_url_whitelisted(url, method, self.scheduled_task): + raise ScriptExecutionError( + f"URL '{url}' with method '{method}' is not whitelisted for this plugin. " + f"Contact support at pluginaccess@smoothschedule.com to request whitelisting." + ) + def http_get(self, url, headers=None): """ Make an HTTP GET request to approved domains. Args: - url: URL to fetch (must be in approved list) + url: URL to fetch (must be whitelisted) headers: Optional headers dictionary Returns: Response text + + Raises: + ScriptExecutionError: If URL not whitelisted or request fails """ self._check_api_limit() + self._validate_url(url, 'GET') import requests - from urllib.parse import urlparse - - # Whitelist of approved domains - APPROVED_DOMAINS = [ - 'api.example.com', - 'hooks.slack.com', - 'api.mailchimp.com', - # Add more approved domains - ] - - parsed = urlparse(url) - if parsed.hostname not in APPROVED_DOMAINS: - raise ScriptExecutionError( - f"Domain '{parsed.hostname}' not in approved list. " - f"Contact support to add it." - ) - - # Prevent SSRF attacks - if parsed.hostname in ['localhost', '127.0.0.1', '0.0.0.0']: - raise ScriptExecutionError("Cannot access localhost") try: response = requests.get( @@ -234,7 +256,160 @@ class SafeScriptAPI: response.raise_for_status() return response.text except requests.RequestException as e: - raise ScriptExecutionError(f"HTTP request failed: {e}") + raise ScriptExecutionError(f"HTTP GET request failed: {e}") + + def http_post(self, url, data=None, headers=None): + """ + Make an HTTP POST request to approved domains. + + Args: + url: URL to post to (must be whitelisted) + data: Data to send (dict or string) + headers: Optional headers dictionary + + Returns: + Response text + + Raises: + ScriptExecutionError: If URL not whitelisted or request fails + """ + self._check_api_limit() + self._validate_url(url, 'POST') + + import requests + + try: + # Auto-detect JSON content + if isinstance(data, dict): + response = requests.post( + url, + json=data, + headers=headers or {}, + timeout=10, + ) + else: + response = requests.post( + url, + data=data, + headers=headers or {}, + timeout=10, + ) + response.raise_for_status() + return response.text + except requests.RequestException as e: + raise ScriptExecutionError(f"HTTP POST request failed: {e}") + + def http_put(self, url, data=None, headers=None): + """ + Make an HTTP PUT request to approved domains. + + Args: + url: URL to put to (must be whitelisted) + data: Data to send (dict or string) + headers: Optional headers dictionary + + Returns: + Response text + + Raises: + ScriptExecutionError: If URL not whitelisted or request fails + """ + self._check_api_limit() + self._validate_url(url, 'PUT') + + import requests + + try: + # Auto-detect JSON content + if isinstance(data, dict): + response = requests.put( + url, + json=data, + headers=headers or {}, + timeout=10, + ) + else: + response = requests.put( + url, + data=data, + headers=headers or {}, + timeout=10, + ) + response.raise_for_status() + return response.text + except requests.RequestException as e: + raise ScriptExecutionError(f"HTTP PUT request failed: {e}") + + def http_patch(self, url, data=None, headers=None): + """ + Make an HTTP PATCH request to approved domains. + + Args: + url: URL to patch (must be whitelisted) + data: Data to send (dict or string) + headers: Optional headers dictionary + + Returns: + Response text + + Raises: + ScriptExecutionError: If URL not whitelisted or request fails + """ + self._check_api_limit() + self._validate_url(url, 'PATCH') + + import requests + + try: + # Auto-detect JSON content + if isinstance(data, dict): + response = requests.patch( + url, + json=data, + headers=headers or {}, + timeout=10, + ) + else: + response = requests.patch( + url, + data=data, + headers=headers or {}, + timeout=10, + ) + response.raise_for_status() + return response.text + except requests.RequestException as e: + raise ScriptExecutionError(f"HTTP PATCH request failed: {e}") + + def http_delete(self, url, headers=None): + """ + Make an HTTP DELETE request to approved domains. + + Args: + url: URL to delete (must be whitelisted) + headers: Optional headers dictionary + + Returns: + Response text + + Raises: + ScriptExecutionError: If URL not whitelisted or request fails + """ + self._check_api_limit() + self._validate_url(url, 'DELETE') + + import requests + + try: + response = requests.delete( + url, + headers=headers or {}, + timeout=10, + ) + response.raise_for_status() + return response.text + except requests.RequestException as e: + raise ScriptExecutionError(f"HTTP DELETE request failed: {e}") def create_appointment(self, title, start_time, end_time, **kwargs): """