feat: Add plugin code analysis and HTTP whitelist validation
Implemented static code analysis to detect and validate HTTP calls in plugins before they are uploaded or approved for the marketplace. **New Functions:** 1. **analyze_plugin_http_calls(script):** - Parses plugin code using Python AST - Detects all api.http_* method calls (GET, POST, PUT, PATCH, DELETE) - Extracts URL from first argument if it's a string literal - Handles dynamic URLs (f-strings, variables) with appropriate warnings - Returns list of HTTP calls with method, URL, and line number 2. **validate_plugin_whitelist(script, scheduled_task):** - Analyzes plugin code for HTTP calls - Validates each detected URL against WhitelistedURL model - Checks both platform-wide and plugin-specific whitelists - Returns validation results with errors, warnings, and detected calls - Provides clear error messages with line numbers **Validation Logic:** - **Static URLs** (string literals): Validated against whitelist, error if not found - **Dynamic URLs** (f-strings, variables): Warning issued, runtime validation required - **Syntax Errors**: Caught and reported as validation errors - **Line Numbers**: All errors/warnings include line number for debugging **Use Cases:** 1. Pre-upload validation: Check plugin before saving to database 2. Approval workflow: Platform staff can see which URLs need whitelisting 3. Marketplace submission: Reject plugins with non-whitelisted URLs 4. Security audit: Analyze existing plugins for HTTP call patterns **Error Messages:** - Clear, actionable messages with line numbers - Direct users to pluginaccess@smoothschedule.com for whitelisting - Warns about dynamic URLs that can't be statically validated This enables proactive security enforcement before plugins are executed, preventing runtime failures and improving user experience. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
@@ -713,6 +713,135 @@ class SafeScriptEngine:
|
|||||||
return ast.unparse(transformed)
|
return ast.unparse(transformed)
|
||||||
|
|
||||||
|
|
||||||
|
def analyze_plugin_http_calls(script: str) -> List[Dict[str, Any]]:
|
||||||
|
"""
|
||||||
|
Analyze plugin code to detect HTTP method calls.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
script: Plugin code to analyze
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List of dictionaries with detected HTTP calls:
|
||||||
|
[
|
||||||
|
{
|
||||||
|
'method': 'GET', # HTTP method
|
||||||
|
'url': 'https://api.example.com/data', # URL if detectable
|
||||||
|
'line': 5, # Line number
|
||||||
|
},
|
||||||
|
...
|
||||||
|
]
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
SyntaxError: If code doesn't parse
|
||||||
|
"""
|
||||||
|
http_calls = []
|
||||||
|
|
||||||
|
try:
|
||||||
|
tree = ast.parse(script)
|
||||||
|
except SyntaxError as e:
|
||||||
|
raise SyntaxError(f"Invalid Python syntax: {e}")
|
||||||
|
|
||||||
|
# HTTP methods to detect
|
||||||
|
http_methods = {
|
||||||
|
'http_get': 'GET',
|
||||||
|
'http_post': 'POST',
|
||||||
|
'http_put': 'PUT',
|
||||||
|
'http_patch': 'PATCH',
|
||||||
|
'http_delete': 'DELETE',
|
||||||
|
}
|
||||||
|
|
||||||
|
for node in ast.walk(tree):
|
||||||
|
if isinstance(node, ast.Call):
|
||||||
|
# Check if it's an API call like api.http_get(...)
|
||||||
|
if isinstance(node.func, ast.Attribute):
|
||||||
|
method_name = node.func.attr
|
||||||
|
if method_name in http_methods:
|
||||||
|
http_method = http_methods[method_name]
|
||||||
|
|
||||||
|
# Try to extract the URL argument (first positional argument)
|
||||||
|
url = None
|
||||||
|
if node.args and len(node.args) > 0:
|
||||||
|
first_arg = node.args[0]
|
||||||
|
# If it's a string literal, extract it
|
||||||
|
if isinstance(first_arg, ast.Constant) and isinstance(first_arg.value, str):
|
||||||
|
url = first_arg.value
|
||||||
|
# If it's an f-string or JoinedStr
|
||||||
|
elif isinstance(first_arg, ast.JoinedStr):
|
||||||
|
# Can't statically determine f-string URLs
|
||||||
|
url = '<dynamic URL - f-string>'
|
||||||
|
# If it's a Name (variable)
|
||||||
|
elif isinstance(first_arg, ast.Name):
|
||||||
|
url = f'<variable: {first_arg.id}>'
|
||||||
|
else:
|
||||||
|
url = '<dynamic URL>'
|
||||||
|
|
||||||
|
http_calls.append({
|
||||||
|
'method': http_method,
|
||||||
|
'url': url,
|
||||||
|
'line': node.lineno,
|
||||||
|
})
|
||||||
|
|
||||||
|
return http_calls
|
||||||
|
|
||||||
|
|
||||||
|
def validate_plugin_whitelist(script: str, scheduled_task=None) -> Dict[str, Any]:
|
||||||
|
"""
|
||||||
|
Validate that all HTTP calls in plugin code are whitelisted.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
script: Plugin code to validate
|
||||||
|
scheduled_task: ScheduledTask instance (optional, for plugin-specific whitelist)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dictionary with validation results:
|
||||||
|
{
|
||||||
|
'valid': bool,
|
||||||
|
'errors': List[str], # Validation error messages
|
||||||
|
'warnings': List[str], # Warnings (e.g., dynamic URLs)
|
||||||
|
'http_calls': List[Dict], # Detected HTTP calls
|
||||||
|
}
|
||||||
|
"""
|
||||||
|
from .models import WhitelistedURL
|
||||||
|
|
||||||
|
result = {
|
||||||
|
'valid': True,
|
||||||
|
'errors': [],
|
||||||
|
'warnings': [],
|
||||||
|
'http_calls': [],
|
||||||
|
}
|
||||||
|
|
||||||
|
try:
|
||||||
|
http_calls = analyze_plugin_http_calls(script)
|
||||||
|
result['http_calls'] = http_calls
|
||||||
|
|
||||||
|
for call in http_calls:
|
||||||
|
url = call['url']
|
||||||
|
method = call['method']
|
||||||
|
line = call['line']
|
||||||
|
|
||||||
|
# Skip if URL is dynamic (can't validate statically)
|
||||||
|
if not url or url.startswith('<'):
|
||||||
|
result['warnings'].append(
|
||||||
|
f"Line {line}: {method} request uses dynamic URL - cannot validate at upload time. "
|
||||||
|
f"Ensure URL is whitelisted before running."
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Check if URL is whitelisted
|
||||||
|
if not WhitelistedURL.is_url_whitelisted(url, method, scheduled_task):
|
||||||
|
result['valid'] = False
|
||||||
|
result['errors'].append(
|
||||||
|
f"Line {line}: {method} request to '{url}' is not whitelisted. "
|
||||||
|
f"Contact support at pluginaccess@smoothschedule.com to request whitelisting."
|
||||||
|
)
|
||||||
|
|
||||||
|
except SyntaxError as e:
|
||||||
|
result['valid'] = False
|
||||||
|
result['errors'].append(f"Syntax error: {str(e)}")
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
|
|
||||||
def test_script_execution():
|
def test_script_execution():
|
||||||
"""Test the safe script engine"""
|
"""Test the safe script engine"""
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user