diff --git a/smoothschedule/schedule/safe_scripting.py b/smoothschedule/schedule/safe_scripting.py index e9a2306..58faa50 100644 --- a/smoothschedule/schedule/safe_scripting.py +++ b/smoothschedule/schedule/safe_scripting.py @@ -713,6 +713,135 @@ class SafeScriptEngine: return ast.unparse(transformed) +def analyze_plugin_http_calls(script: str) -> List[Dict[str, Any]]: + """ + Analyze plugin code to detect HTTP method calls. + + Args: + script: Plugin code to analyze + + Returns: + List of dictionaries with detected HTTP calls: + [ + { + 'method': 'GET', # HTTP method + 'url': 'https://api.example.com/data', # URL if detectable + 'line': 5, # Line number + }, + ... + ] + + Raises: + SyntaxError: If code doesn't parse + """ + http_calls = [] + + try: + tree = ast.parse(script) + except SyntaxError as e: + raise SyntaxError(f"Invalid Python syntax: {e}") + + # HTTP methods to detect + http_methods = { + 'http_get': 'GET', + 'http_post': 'POST', + 'http_put': 'PUT', + 'http_patch': 'PATCH', + 'http_delete': 'DELETE', + } + + for node in ast.walk(tree): + if isinstance(node, ast.Call): + # Check if it's an API call like api.http_get(...) + if isinstance(node.func, ast.Attribute): + method_name = node.func.attr + if method_name in http_methods: + http_method = http_methods[method_name] + + # Try to extract the URL argument (first positional argument) + url = None + if node.args and len(node.args) > 0: + first_arg = node.args[0] + # If it's a string literal, extract it + if isinstance(first_arg, ast.Constant) and isinstance(first_arg.value, str): + url = first_arg.value + # If it's an f-string or JoinedStr + elif isinstance(first_arg, ast.JoinedStr): + # Can't statically determine f-string URLs + url = '' + # If it's a Name (variable) + elif isinstance(first_arg, ast.Name): + url = f'' + else: + url = '' + + http_calls.append({ + 'method': http_method, + 'url': url, + 'line': node.lineno, + }) + + return http_calls + + +def validate_plugin_whitelist(script: str, scheduled_task=None) -> Dict[str, Any]: + """ + Validate that all HTTP calls in plugin code are whitelisted. + + Args: + script: Plugin code to validate + scheduled_task: ScheduledTask instance (optional, for plugin-specific whitelist) + + Returns: + Dictionary with validation results: + { + 'valid': bool, + 'errors': List[str], # Validation error messages + 'warnings': List[str], # Warnings (e.g., dynamic URLs) + 'http_calls': List[Dict], # Detected HTTP calls + } + """ + from .models import WhitelistedURL + + result = { + 'valid': True, + 'errors': [], + 'warnings': [], + 'http_calls': [], + } + + try: + http_calls = analyze_plugin_http_calls(script) + result['http_calls'] = http_calls + + for call in http_calls: + url = call['url'] + method = call['method'] + line = call['line'] + + # Skip if URL is dynamic (can't validate statically) + if not url or url.startswith('<'): + result['warnings'].append( + f"Line {line}: {method} request uses dynamic URL - cannot validate at upload time. " + f"Ensure URL is whitelisted before running." + ) + continue + + # Check if URL is whitelisted + if not WhitelistedURL.is_url_whitelisted(url, method, scheduled_task): + result['valid'] = False + result['errors'].append( + f"Line {line}: {method} request to '{url}' is not whitelisted. " + f"Contact support at pluginaccess@smoothschedule.com to request whitelisting." + ) + + except SyntaxError as e: + result['valid'] = False + result['errors'].append(f"Syntax error: {str(e)}") + + return result + + def test_script_execution(): """Test the safe script engine"""