Add automation run tracking with quota enforcement

- Add track-run action to ActivePieces smoothschedule piece
- Add webhook endpoint to receive run tracking from automations
- Update quota service to increment automation_runs_used count
- Add Celery task for async run tracking
- Update default flows to include track-run step

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
poduck
2025-12-22 02:30:04 -05:00
parent dd24eede87
commit 28d6cee207
11 changed files with 627 additions and 160 deletions

View File

@@ -1 +1 @@
1766280110308 1766388020169

View File

@@ -1,7 +1,7 @@
import { PieceAuth, createPiece } from '@activepieces/pieces-framework'; import { PieceAuth, createPiece } from '@activepieces/pieces-framework';
import { PieceCategory } from '@activepieces/shared'; import { PieceCategory } from '@activepieces/shared';
import { createCustomApiCallAction } from '@activepieces/pieces-common'; import { createCustomApiCallAction } from '@activepieces/pieces-common';
import { createEventAction, findEventsAction, updateEventAction, cancelEventAction } from './lib/actions'; import { createEventAction, findEventsAction, updateEventAction, cancelEventAction, trackRunAction } from './lib/actions';
import { listResourcesAction } from './lib/actions/list-resources'; import { listResourcesAction } from './lib/actions/list-resources';
import { listServicesAction } from './lib/actions/list-services'; import { listServicesAction } from './lib/actions/list-services';
import { listInactiveCustomersAction } from './lib/actions/list-inactive-customers'; import { listInactiveCustomersAction } from './lib/actions/list-inactive-customers';
@@ -68,6 +68,7 @@ export const smoothSchedule = createPiece({
minimumSupportedRelease: '0.36.1', minimumSupportedRelease: '0.36.1',
authors: ['smoothschedule'], authors: ['smoothschedule'],
actions: [ actions: [
trackRunAction,
createEventAction, createEventAction,
updateEventAction, updateEventAction,
cancelEventAction, cancelEventAction,

View File

@@ -6,3 +6,4 @@ export * from './list-resources';
export * from './list-services'; export * from './list-services';
export * from './list-inactive-customers'; export * from './list-inactive-customers';
export * from './list-customers'; export * from './list-customers';
export * from './track-run';

View File

@@ -0,0 +1,86 @@
import { createAction } from '@activepieces/pieces-framework';
import { HttpMethod, httpClient } from '@activepieces/pieces-common';
import { smoothScheduleAuth, SmoothScheduleAuth } from '../../index';
interface TrackRunResponse {
success: boolean;
runs_this_month: number;
limit: number;
remaining: number;
}
/**
* Track Automation Run Action
*
* This action should be placed at the beginning of each automation flow
* to track executions for quota management. It increments the run counter
* for the current flow and returns quota information.
*
* The action:
* 1. Gets the current flow ID from the context
* 2. Calls the SmoothSchedule track-run API endpoint
* 3. Returns quota usage information
*/
export const trackRunAction = createAction({
auth: smoothScheduleAuth,
name: 'track_run',
displayName: 'Track Run',
description:
'Track this automation execution for quota management. Place at the start of each flow.',
props: {},
async run(context) {
const auth = context.auth as SmoothScheduleAuth;
// Get the current flow ID from the Activepieces context
const flowId = context.flows.current.id;
// Build the URL for the track-run endpoint
// The track-run endpoint is at /api/activepieces/track-run/
const url = new URL(auth.props.baseUrl);
let hostHeader = `${url.hostname}${url.port ? ':' + url.port : ''}`;
// Map docker hostname to lvh.me (which Django recognizes)
if (url.hostname === 'django') {
hostHeader = `lvh.me${url.port ? ':' + url.port : ''}`;
}
const trackRunUrl = `${auth.props.baseUrl}/api/activepieces/track-run/`;
try {
const response = await httpClient.sendRequest<TrackRunResponse>({
method: HttpMethod.POST,
url: trackRunUrl,
body: {
flow_id: flowId,
},
headers: {
'X-Tenant': auth.props.subdomain,
Host: hostHeader,
'Content-Type': 'application/json',
},
});
return {
success: response.body.success,
runs_this_month: response.body.runs_this_month,
limit: response.body.limit,
remaining: response.body.remaining,
message:
response.body.limit < 0
? 'Unlimited automation runs'
: `${response.body.remaining} automation runs remaining this month`,
};
} catch (error) {
// Log the error but don't fail the flow - tracking is non-critical
console.error('Failed to track automation run:', error);
return {
success: false,
runs_this_month: -1,
limit: -1,
remaining: -1,
message: 'Failed to track run (flow will continue)',
error: error instanceof Error ? error.message : String(error),
};
}
},
});

View File

@@ -138,8 +138,6 @@ const QuotaSettings: React.FC = () => {
return <Calendar className="h-5 w-5" />; return <Calendar className="h-5 w-5" />;
case 'MAX_AUTOMATION_RUNS': case 'MAX_AUTOMATION_RUNS':
return <Bot className="h-5 w-5" />; return <Bot className="h-5 w-5" />;
case 'MAX_AUTOMATED_TASKS':
return <Bot className="h-5 w-5" />;
default: default:
return <AlertTriangle className="h-5 w-5" />; return <AlertTriangle className="h-5 w-5" />;
} }

View File

@@ -266,7 +266,7 @@ def HasQuota(feature_code):
'MAX_SERVICES': 'max_services', 'MAX_SERVICES': 'max_services',
'MAX_APPOINTMENTS': 'max_appointments', 'MAX_APPOINTMENTS': 'max_appointments',
'MAX_EMAIL_TEMPLATES': 'max_email_templates', 'MAX_EMAIL_TEMPLATES': 'max_email_templates',
'MAX_AUTOMATED_TASKS': 'max_automated_tasks', 'MAX_AUTOMATION_RUNS': 'max_automation_runs',
} }
billing_feature = feature_code_map.get(feature_code, feature_code.lower()) billing_feature = feature_code_map.get(feature_code, feature_code.lower())
limit = tenant.get_limit(billing_feature) limit = tenant.get_limit(billing_feature)

View File

@@ -54,11 +54,6 @@ class QuotaService:
}, },
# Note: MAX_EMAIL_TEMPLATES quota removed - email templates are now system-wide # Note: MAX_EMAIL_TEMPLATES quota removed - email templates are now system-wide
# using PuckEmailTemplate in the messaging app, not per-tenant # using PuckEmailTemplate in the messaging app, not per-tenant
'MAX_AUTOMATED_TASKS': {
'model': 'schedule.models.ScheduledTask',
'display_name': 'automated tasks',
'count_method': 'count_automated_tasks',
},
'MAX_AUTOMATION_RUNS': { 'MAX_AUTOMATION_RUNS': {
'model': None, # No archivable model - informational only 'model': None, # No archivable model - informational only
'display_name': 'automation runs this month', 'display_name': 'automation runs this month',
@@ -94,11 +89,7 @@ class QuotaService:
return Service.objects.filter(is_archived_by_quota=False).count() return Service.objects.filter(is_archived_by_quota=False).count()
# Note: count_email_templates removed - templates are now system-wide via PuckEmailTemplate # Note: count_email_templates removed - templates are now system-wide via PuckEmailTemplate
# Note: count_automated_tasks removed - replaced by count_automation_runs for Activepieces
def count_automated_tasks(self) -> int:
"""Count automated tasks."""
from smoothschedule.scheduling.schedule.models import ScheduledTask
return ScheduledTask.objects.count()
def count_automation_runs(self) -> int: def count_automation_runs(self) -> int:
""" """
@@ -135,7 +126,6 @@ class QuotaService:
'MAX_ADDITIONAL_USERS': 'max_users', 'MAX_ADDITIONAL_USERS': 'max_users',
'MAX_RESOURCES': 'max_resources', 'MAX_RESOURCES': 'max_resources',
'MAX_SERVICES': 'max_services', 'MAX_SERVICES': 'max_services',
'MAX_AUTOMATED_TASKS': 'max_automated_tasks',
'MAX_AUTOMATION_RUNS': 'max_automation_runs', 'MAX_AUTOMATION_RUNS': 'max_automation_runs',
} }
feature_code = feature_code_map.get(quota_type, quota_type.lower()) feature_code = feature_code_map.get(quota_type, quota_type.lower())

View File

@@ -14,7 +14,8 @@ from typing import Dict, Any
# Version for tracking upgrades # Version for tracking upgrades
# 1.0.0 - Initial default flows # 1.0.0 - Initial default flows
# 1.1.0 - Fixed context variable names to match email template tags # 1.1.0 - Fixed context variable names to match email template tags
FLOW_VERSION = "1.1.0" # 1.2.0 - Added Track Run action for quota tracking
FLOW_VERSION = "1.2.0"
# System email types for the send_email action # System email types for the send_email action
EMAIL_TYPES = { EMAIL_TYPES = {
@@ -137,6 +138,38 @@ def get_sample_data_for_flow(flow_type: str) -> Dict[str, Any]:
return FLOW_SAMPLE_DATA.get(flow_type, {}) return FLOW_SAMPLE_DATA.get(flow_type, {})
def _create_track_run_action(next_action: Dict[str, Any]) -> Dict[str, Any]:
"""
Create a track_run action step.
This action should be the first step in every flow to track
automation runs for quota management.
Args:
next_action: The next action in the chain (required)
Returns:
Action definition dict
"""
return {
"name": "track_run",
"displayName": "Track Run",
"type": "PIECE",
"valid": True,
"settings": {
"pieceName": "@activepieces/piece-smoothschedule",
"pieceVersion": "~0.0.1",
"pieceType": "CUSTOM",
"actionName": "track_run",
"input": {},
"inputUiInfo": {
"customizedInputs": {},
},
},
"nextAction": next_action,
}
def _create_send_email_action( def _create_send_email_action(
email_type: str, email_type: str,
step_name: str = "send_email", step_name: str = "send_email",
@@ -203,8 +236,12 @@ def get_appointment_confirmation_flow() -> Dict[str, Any]:
Appointment Confirmation Flow Appointment Confirmation Flow
Trigger: When a new event is created with status SCHEDULED Trigger: When a new event is created with status SCHEDULED
Action: Send appointment confirmation email Action: Track run, then send appointment confirmation email
""" """
send_email_action = _create_send_email_action(
email_type=EMAIL_TYPES["appointment_confirmation"],
step_name="send_confirmation_email",
)
return { return {
"displayName": "Appointment Confirmation Email", "displayName": "Appointment Confirmation Email",
"description": "Automatically send a confirmation email when an appointment is booked", "description": "Automatically send a confirmation email when an appointment is booked",
@@ -223,10 +260,7 @@ def get_appointment_confirmation_flow() -> Dict[str, Any]:
"customizedInputs": {}, "customizedInputs": {},
}, },
}, },
"nextAction": _create_send_email_action( "nextAction": _create_track_run_action(send_email_action),
email_type=EMAIL_TYPES["appointment_confirmation"],
step_name="send_confirmation_email",
),
}, },
"schemaVersion": "1", "schemaVersion": "1",
} }
@@ -237,8 +271,42 @@ def get_appointment_reminder_flow() -> Dict[str, Any]:
Appointment Reminder Flow Appointment Reminder Flow
Trigger: Upcoming events (based on service reminder settings) Trigger: Upcoming events (based on service reminder settings)
Action: Send reminder email Action: Track run, then send reminder email
""" """
send_email_action = {
"name": "send_reminder_email",
"displayName": "Send Reminder Email",
"type": "PIECE",
"valid": True,
"settings": {
"pieceName": "@activepieces/piece-smoothschedule",
"pieceVersion": "~0.0.1",
"pieceType": "CUSTOM",
"actionName": "send_email",
"input": {
"to_email": "{{trigger.customer.email}}",
"template_type": "system",
"email_type": EMAIL_TYPES["appointment_reminder"],
"context": {
"customer_first_name": "{{trigger.customer.first_name}}",
"customer_last_name": "{{trigger.customer.last_name}}",
"customer_name": "{{trigger.customer.first_name}} {{trigger.customer.last_name}}",
"customer_email": "{{trigger.customer.email}}",
"customer_phone": "{{trigger.customer.phone}}",
"service_name": "{{trigger.service.name}}",
"appointment_date": "{{trigger.start_time}}",
"appointment_time": "{{trigger.start_time}}",
"appointment_datetime": "{{trigger.start_time}}",
"staff_name": "{{trigger.resource_name}}",
"location_name": "{{trigger.location}}",
"location_address": "{{trigger.location_address}}",
},
},
"inputUiInfo": {
"customizedInputs": {},
},
},
}
return { return {
"displayName": "Appointment Reminder Email", "displayName": "Appointment Reminder Email",
"description": "Send reminder emails before appointments (based on service settings)", "description": "Send reminder emails before appointments (based on service settings)",
@@ -260,40 +328,7 @@ def get_appointment_reminder_flow() -> Dict[str, Any]:
"customizedInputs": {}, "customizedInputs": {},
}, },
}, },
"nextAction": { "nextAction": _create_track_run_action(send_email_action),
"name": "send_reminder_email",
"displayName": "Send Reminder Email",
"type": "PIECE",
"valid": True,
"settings": {
"pieceName": "@activepieces/piece-smoothschedule",
"pieceVersion": "~0.0.1",
"pieceType": "CUSTOM",
"actionName": "send_email",
"input": {
"to_email": "{{trigger.customer.email}}",
"template_type": "system",
"email_type": EMAIL_TYPES["appointment_reminder"],
"context": {
"customer_first_name": "{{trigger.customer.first_name}}",
"customer_last_name": "{{trigger.customer.last_name}}",
"customer_name": "{{trigger.customer.first_name}} {{trigger.customer.last_name}}",
"customer_email": "{{trigger.customer.email}}",
"customer_phone": "{{trigger.customer.phone}}",
"service_name": "{{trigger.service.name}}",
"appointment_date": "{{trigger.start_time}}",
"appointment_time": "{{trigger.start_time}}",
"appointment_datetime": "{{trigger.start_time}}",
"staff_name": "{{trigger.resource_name}}",
"location_name": "{{trigger.location}}",
"location_address": "{{trigger.location_address}}",
},
},
"inputUiInfo": {
"customizedInputs": {},
},
},
},
}, },
"schemaVersion": "1", "schemaVersion": "1",
} }
@@ -304,8 +339,40 @@ def get_thank_you_flow() -> Dict[str, Any]:
Thank You Email Flow Thank You Email Flow
Trigger: When a final payment is received Trigger: When a final payment is received
Action: Send thank you email Action: Track run, then send thank you email
""" """
send_email_action = {
"name": "send_thank_you_email",
"displayName": "Send Thank You Email",
"type": "PIECE",
"valid": True,
"settings": {
"pieceName": "@activepieces/piece-smoothschedule",
"pieceVersion": "~0.0.1",
"pieceType": "CUSTOM",
"actionName": "send_email",
"input": {
"to_email": "{{trigger.customer.email}}",
"template_type": "system",
"email_type": EMAIL_TYPES["thank_you"],
"context": {
"customer_first_name": "{{trigger.customer.first_name}}",
"customer_last_name": "{{trigger.customer.last_name}}",
"customer_name": "{{trigger.customer.first_name}} {{trigger.customer.last_name}}",
"customer_email": "{{trigger.customer.email}}",
"customer_phone": "{{trigger.customer.phone}}",
"service_name": "{{trigger.service.name}}",
"amount_paid": "{{trigger.amount}}",
"invoice_number": "{{trigger.payment_intent_id}}",
"appointment_date": "{{trigger.event.start_time}}",
"appointment_datetime": "{{trigger.event.start_time}}",
},
},
"inputUiInfo": {
"customizedInputs": {},
},
},
}
return { return {
"displayName": "Thank You Email (After Payment)", "displayName": "Thank You Email (After Payment)",
"description": "Send a thank you email when final payment is completed", "description": "Send a thank you email when final payment is completed",
@@ -326,38 +393,7 @@ def get_thank_you_flow() -> Dict[str, Any]:
"customizedInputs": {}, "customizedInputs": {},
}, },
}, },
"nextAction": { "nextAction": _create_track_run_action(send_email_action),
"name": "send_thank_you_email",
"displayName": "Send Thank You Email",
"type": "PIECE",
"valid": True,
"settings": {
"pieceName": "@activepieces/piece-smoothschedule",
"pieceVersion": "~0.0.1",
"pieceType": "CUSTOM",
"actionName": "send_email",
"input": {
"to_email": "{{trigger.customer.email}}",
"template_type": "system",
"email_type": EMAIL_TYPES["thank_you"],
"context": {
"customer_first_name": "{{trigger.customer.first_name}}",
"customer_last_name": "{{trigger.customer.last_name}}",
"customer_name": "{{trigger.customer.first_name}} {{trigger.customer.last_name}}",
"customer_email": "{{trigger.customer.email}}",
"customer_phone": "{{trigger.customer.phone}}",
"service_name": "{{trigger.service.name}}",
"amount_paid": "{{trigger.amount}}",
"invoice_number": "{{trigger.payment_intent_id}}",
"appointment_date": "{{trigger.event.start_time}}",
"appointment_datetime": "{{trigger.event.start_time}}",
},
},
"inputUiInfo": {
"customizedInputs": {},
},
},
},
}, },
"schemaVersion": "1", "schemaVersion": "1",
} }
@@ -368,8 +404,43 @@ def get_deposit_payment_flow() -> Dict[str, Any]:
Deposit Payment Confirmation Flow Deposit Payment Confirmation Flow
Trigger: When a deposit payment is received Trigger: When a deposit payment is received
Action: Send payment receipt email with deposit-specific subject Action: Track run, then send payment receipt email with deposit-specific subject
""" """
send_email_action = {
"name": "send_deposit_confirmation",
"displayName": "Send Deposit Confirmation",
"type": "PIECE",
"valid": True,
"settings": {
"pieceName": "@activepieces/piece-smoothschedule",
"pieceVersion": "~0.0.1",
"pieceType": "CUSTOM",
"actionName": "send_email",
"input": {
"to_email": "{{trigger.customer.email}}",
"template_type": "system",
"email_type": EMAIL_TYPES["payment_receipt"],
"subject_override": "Deposit Received - {{trigger.service.name}}",
"context": {
"customer_first_name": "{{trigger.customer.first_name}}",
"customer_last_name": "{{trigger.customer.last_name}}",
"customer_name": "{{trigger.customer.first_name}} {{trigger.customer.last_name}}",
"customer_email": "{{trigger.customer.email}}",
"customer_phone": "{{trigger.customer.phone}}",
"service_name": "{{trigger.service.name}}",
"amount_paid": "{{trigger.amount}}",
"invoice_number": "{{trigger.payment_intent_id}}",
"deposit_amount": "{{trigger.amount}}",
"total_paid": "{{trigger.amount}}",
"appointment_date": "{{trigger.event.start_time}}",
"appointment_datetime": "{{trigger.event.start_time}}",
},
},
"inputUiInfo": {
"customizedInputs": {},
},
},
}
return { return {
"displayName": "Deposit Payment Confirmation", "displayName": "Deposit Payment Confirmation",
"description": "Send a confirmation when a deposit payment is received", "description": "Send a confirmation when a deposit payment is received",
@@ -390,41 +461,7 @@ def get_deposit_payment_flow() -> Dict[str, Any]:
"customizedInputs": {}, "customizedInputs": {},
}, },
}, },
"nextAction": { "nextAction": _create_track_run_action(send_email_action),
"name": "send_deposit_confirmation",
"displayName": "Send Deposit Confirmation",
"type": "PIECE",
"valid": True,
"settings": {
"pieceName": "@activepieces/piece-smoothschedule",
"pieceVersion": "~0.0.1",
"pieceType": "CUSTOM",
"actionName": "send_email",
"input": {
"to_email": "{{trigger.customer.email}}",
"template_type": "system",
"email_type": EMAIL_TYPES["payment_receipt"],
"subject_override": "Deposit Received - {{trigger.service.name}}",
"context": {
"customer_first_name": "{{trigger.customer.first_name}}",
"customer_last_name": "{{trigger.customer.last_name}}",
"customer_name": "{{trigger.customer.first_name}} {{trigger.customer.last_name}}",
"customer_email": "{{trigger.customer.email}}",
"customer_phone": "{{trigger.customer.phone}}",
"service_name": "{{trigger.service.name}}",
"amount_paid": "{{trigger.amount}}",
"invoice_number": "{{trigger.payment_intent_id}}",
"deposit_amount": "{{trigger.amount}}",
"total_paid": "{{trigger.amount}}",
"appointment_date": "{{trigger.event.start_time}}",
"appointment_datetime": "{{trigger.event.start_time}}",
},
},
"inputUiInfo": {
"customizedInputs": {},
},
},
},
}, },
"schemaVersion": "1", "schemaVersion": "1",
} }
@@ -435,8 +472,41 @@ def get_final_payment_flow() -> Dict[str, Any]:
Final Payment Confirmation Flow Final Payment Confirmation Flow
Trigger: When a final payment is received Trigger: When a final payment is received
Action: Send payment receipt email Action: Track run, then send payment receipt email
""" """
send_email_action = {
"name": "send_payment_confirmation",
"displayName": "Send Payment Confirmation",
"type": "PIECE",
"valid": True,
"settings": {
"pieceName": "@activepieces/piece-smoothschedule",
"pieceVersion": "~0.0.1",
"pieceType": "CUSTOM",
"actionName": "send_email",
"input": {
"to_email": "{{trigger.customer.email}}",
"template_type": "system",
"email_type": EMAIL_TYPES["payment_receipt"],
"context": {
"customer_first_name": "{{trigger.customer.first_name}}",
"customer_last_name": "{{trigger.customer.last_name}}",
"customer_name": "{{trigger.customer.first_name}} {{trigger.customer.last_name}}",
"customer_email": "{{trigger.customer.email}}",
"customer_phone": "{{trigger.customer.phone}}",
"service_name": "{{trigger.service.name}}",
"amount_paid": "{{trigger.amount}}",
"invoice_number": "{{trigger.payment_intent_id}}",
"total_paid": "{{trigger.amount}}",
"appointment_date": "{{trigger.event.start_time}}",
"appointment_datetime": "{{trigger.event.start_time}}",
},
},
"inputUiInfo": {
"customizedInputs": {},
},
},
}
return { return {
"displayName": "Final Payment Confirmation", "displayName": "Final Payment Confirmation",
"description": "Send a confirmation when the final payment is received", "description": "Send a confirmation when the final payment is received",
@@ -457,39 +527,7 @@ def get_final_payment_flow() -> Dict[str, Any]:
"customizedInputs": {}, "customizedInputs": {},
}, },
}, },
"nextAction": { "nextAction": _create_track_run_action(send_email_action),
"name": "send_payment_confirmation",
"displayName": "Send Payment Confirmation",
"type": "PIECE",
"valid": True,
"settings": {
"pieceName": "@activepieces/piece-smoothschedule",
"pieceVersion": "~0.0.1",
"pieceType": "CUSTOM",
"actionName": "send_email",
"input": {
"to_email": "{{trigger.customer.email}}",
"template_type": "system",
"email_type": EMAIL_TYPES["payment_receipt"],
"context": {
"customer_first_name": "{{trigger.customer.first_name}}",
"customer_last_name": "{{trigger.customer.last_name}}",
"customer_name": "{{trigger.customer.first_name}} {{trigger.customer.last_name}}",
"customer_email": "{{trigger.customer.email}}",
"customer_phone": "{{trigger.customer.phone}}",
"service_name": "{{trigger.service.name}}",
"amount_paid": "{{trigger.amount}}",
"invoice_number": "{{trigger.payment_intent_id}}",
"total_paid": "{{trigger.amount}}",
"appointment_date": "{{trigger.event.start_time}}",
"appointment_datetime": "{{trigger.event.start_time}}",
},
},
"inputUiInfo": {
"customizedInputs": {},
},
},
},
}, },
"schemaVersion": "1", "schemaVersion": "1",
} }

View File

@@ -0,0 +1,241 @@
"""
Celery tasks for Activepieces automation run tracking.
These tasks run periodically to:
1. Reconcile automation run counts with Activepieces API
2. Reset monthly counters at the start of each billing period
"""
from celery import shared_task
from django.utils import timezone
from django.db.models import Sum
import logging
logger = logging.getLogger(__name__)
@shared_task
def reconcile_automation_run_counts():
"""
Reconcile local automation run counts with Activepieces API.
This task runs periodically (e.g., daily) to ensure our local run counts
are accurate by comparing them with actual flow run data from Activepieces.
Returns:
dict: Summary of reconciliation results
"""
from smoothschedule.identity.core.models import Tenant
from .models import TenantActivepiecesProject, TenantDefaultFlow
from .services import get_activepieces_client, ActivepiecesError
results = {
'tenants_checked': 0,
'flows_checked': 0,
'flows_updated': 0,
'errors': [],
}
client = get_activepieces_client()
# Get all tenants with Activepieces projects
projects = TenantActivepiecesProject.objects.select_related('tenant').all()
for project in projects:
tenant = project.tenant
results['tenants_checked'] += 1
try:
# Get session token for this tenant
token, project_id = client.get_session_token(tenant)
if not token:
logger.warning(f"Could not get session token for tenant {tenant.schema_name}")
continue
# Get flow runs for the current month
current_month_start = timezone.now().replace(
day=1, hour=0, minute=0, second=0, microsecond=0
)
# Get default flows for this tenant
default_flows = TenantDefaultFlow.objects.filter(tenant=tenant)
for flow in default_flows:
results['flows_checked'] += 1
try:
# Query Activepieces for run count
# Note: This requires the flow-runs API endpoint
run_count = _get_flow_run_count(
client,
token,
flow.activepieces_flow_id,
current_month_start,
)
if run_count is not None and run_count != flow.runs_this_month:
logger.info(
f"Reconciling flow {flow.flow_type} for {tenant.schema_name}: "
f"local={flow.runs_this_month}, actual={run_count}"
)
flow.runs_this_month = run_count
flow.runs_month_started = current_month_start.date()
flow.save(update_fields=['runs_this_month', 'runs_month_started'])
results['flows_updated'] += 1
except Exception as e:
error_msg = f"Error checking flow {flow.flow_type}: {str(e)}"
logger.error(error_msg)
results['errors'].append(error_msg)
except ActivepiecesError as e:
error_msg = f"Error for tenant {tenant.schema_name}: {str(e)}"
logger.error(error_msg)
results['errors'].append(error_msg)
except Exception as e:
error_msg = f"Unexpected error for tenant {tenant.schema_name}: {str(e)}"
logger.error(error_msg, exc_info=True)
results['errors'].append(error_msg)
logger.info(
f"Reconciliation complete: {results['tenants_checked']} tenants, "
f"{results['flows_checked']} flows checked, "
f"{results['flows_updated']} flows updated"
)
return results
def _get_flow_run_count(client, token: str, flow_id: str, since: timezone.datetime) -> int | None:
"""
Get the count of runs for a flow since a given date.
Args:
client: ActivepiecesClient instance
token: Session token for API calls
flow_id: The Activepieces flow ID
since: Start date for counting runs
Returns:
Number of runs, or None if unable to determine
"""
try:
# Query the flow-runs endpoint
# GET /v1/flow-runs?flowId=xxx&createdAfter=xxx
endpoint = f"/api/v1/flow-runs?flowId={flow_id}&createdAfter={since.isoformat()}&limit=1000"
response = client._request("GET", endpoint, token=token)
# Response has 'data' array of flow runs
runs = response.get('data', [])
# Count only successful runs
successful_runs = [r for r in runs if r.get('status') == 'SUCCEEDED']
return len(successful_runs)
except Exception as e:
logger.warning(f"Could not get run count for flow {flow_id}: {e}")
return None
@shared_task
def reset_monthly_run_counters():
"""
Reset run counters at the start of each billing month.
This task should run on the first day of each month to reset
all flow run counters.
Note: The increment_run_count() method also handles resetting,
but this task ensures counters are reset even for inactive flows.
Returns:
dict: Summary of reset results
"""
from .models import TenantDefaultFlow
results = {
'flows_reset': 0,
'errors': [],
}
today = timezone.now().date()
current_month_start = today.replace(day=1)
try:
# Find all flows with a runs_month_started before this month
flows_to_reset = TenantDefaultFlow.objects.exclude(
runs_month_started=current_month_start
).filter(runs_this_month__gt=0)
for flow in flows_to_reset:
flow.runs_this_month = 0
flow.runs_month_started = current_month_start
flow.save(update_fields=['runs_this_month', 'runs_month_started'])
results['flows_reset'] += 1
logger.info(f"Reset {results['flows_reset']} flow run counters")
except Exception as e:
error_msg = f"Error resetting run counters: {str(e)}"
logger.error(error_msg, exc_info=True)
results['errors'].append(error_msg)
return results
@shared_task
def get_tenant_automation_usage(tenant_id: int) -> dict:
"""
Get automation usage summary for a specific tenant.
Useful for displaying in quota management UI.
Args:
tenant_id: ID of the tenant
Returns:
dict: Usage summary including runs, limit, and remaining
"""
from smoothschedule.identity.core.models import Tenant
from smoothschedule.identity.core.quota_service import QuotaService
from .models import TenantDefaultFlow
try:
tenant = Tenant.objects.get(id=tenant_id)
quota_service = QuotaService(tenant)
# Get current usage
current_usage = quota_service.get_current_usage('MAX_AUTOMATION_RUNS')
limit = quota_service.get_limit('MAX_AUTOMATION_RUNS')
# Get per-flow breakdown
flows = TenantDefaultFlow.objects.filter(tenant=tenant).values(
'flow_type', 'runs_this_month', 'is_enabled'
)
flow_breakdown = {
f['flow_type']: {
'runs': f['runs_this_month'],
'enabled': f['is_enabled'],
}
for f in flows
}
return {
'tenant_id': tenant_id,
'total_runs': current_usage,
'limit': limit,
'remaining': -1 if limit < 0 else max(0, limit - current_usage),
'is_unlimited': limit < 0,
'flows': flow_breakdown,
}
except Tenant.DoesNotExist:
logger.error(f"Tenant {tenant_id} not found")
return {'error': f'Tenant {tenant_id} not found'}
except Exception as e:
logger.error(f"Error getting usage for tenant {tenant_id}: {str(e)}", exc_info=True)
return {'error': str(e)}

View File

@@ -12,6 +12,7 @@ from .views import (
DefaultFlowsListView, DefaultFlowsListView,
DefaultFlowRestoreView, DefaultFlowRestoreView,
DefaultFlowsRestoreAllView, DefaultFlowsRestoreAllView,
TrackAutomationRunView,
) )
app_name = "activepieces" app_name = "activepieces"
@@ -57,4 +58,10 @@ urlpatterns = [
DefaultFlowsRestoreAllView.as_view(), DefaultFlowsRestoreAllView.as_view(),
name="default-flows-restore-all", name="default-flows-restore-all",
), ),
# Track automation run for quota management
path(
"track-run/",
TrackAutomationRunView.as_view(),
name="track-run",
),
] ]

View File

@@ -381,6 +381,111 @@ class DefaultFlowRestoreView(TenantRequiredAPIView, APIView):
) )
class TrackAutomationRunView(APIView):
"""
Track an automation flow execution for quota management.
POST /api/activepieces/track-run/
This endpoint is called by the Track Run action in Activepieces flows
to increment the run counter for quota tracking.
Body:
{
"flow_id": "activepieces-flow-id",
"tenant_id": 123 # Optional, extracted from flow if not provided
}
Returns:
{
"success": true,
"runs_this_month": 42,
"limit": 2000,
"remaining": 1958
}
"""
permission_classes = [AllowAny] # Called from Activepieces, auth via API key
def post(self, request):
from smoothschedule.identity.core.models import Tenant
flow_id = request.data.get("flow_id")
tenant_id = request.data.get("tenant_id")
if not flow_id:
return Response(
{"error": "flow_id is required"},
status=status.HTTP_400_BAD_REQUEST,
)
# Find the flow by Activepieces flow ID
try:
default_flow = TenantDefaultFlow.objects.select_related("tenant").get(
activepieces_flow_id=flow_id
)
tenant = default_flow.tenant
except TenantDefaultFlow.DoesNotExist:
# If not a default flow, try to find tenant from tenant_id
if tenant_id:
try:
tenant = Tenant.objects.get(id=tenant_id)
except Tenant.DoesNotExist:
return Response(
{"error": "Tenant not found"},
status=status.HTTP_404_NOT_FOUND,
)
else:
# Try to find by project ID if we have it
project_id = request.data.get("project_id")
if project_id:
try:
project = TenantActivepiecesProject.objects.select_related(
"tenant"
).get(project_id=project_id)
tenant = project.tenant
except TenantActivepiecesProject.DoesNotExist:
return Response(
{"error": "Project not found"},
status=status.HTTP_404_NOT_FOUND,
)
else:
return Response(
{"error": "Flow not found and no tenant_id provided"},
status=status.HTTP_404_NOT_FOUND,
)
default_flow = None
# Increment run count
if default_flow:
default_flow.increment_run_count()
# Get current usage and limit
from smoothschedule.identity.core.quota_service import QuotaService
quota_service = QuotaService(tenant)
current_usage = quota_service.get_current_usage("MAX_AUTOMATION_RUNS")
limit = quota_service.get_limit("MAX_AUTOMATION_RUNS")
# Calculate remaining (-1 means unlimited)
if limit < 0:
remaining = -1 # Unlimited
else:
remaining = max(0, limit - current_usage)
logger.info(
f"Tracked automation run for tenant {tenant.schema_name}: "
f"{current_usage}/{limit} (flow: {flow_id})"
)
return Response({
"success": True,
"runs_this_month": current_usage,
"limit": limit,
"remaining": remaining,
})
class DefaultFlowsRestoreAllView(TenantRequiredAPIView, APIView): class DefaultFlowsRestoreAllView(TenantRequiredAPIView, APIView):
""" """
Restore all default flows to their original definitions. Restore all default flows to their original definitions.