Add Activepieces integration for workflow automation

- Add Activepieces fork with SmoothSchedule custom piece
- Create integrations app with Activepieces service layer
- Add embed token endpoint for iframe integration
- Create Automations page with embedded workflow builder
- Add sidebar visibility fix for embed mode
- Add list inactive customers endpoint to Public API
- Include SmoothSchedule triggers: event created/updated/cancelled
- Include SmoothSchedule actions: create/update/cancel events, list resources/services/customers

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
poduck
2025-12-18 22:59:37 -05:00
parent 9848268d34
commit 3aa7199503
16292 changed files with 1284892 additions and 4708 deletions

View File

@@ -0,0 +1,37 @@
{
"extends": ["../api/.eslintrc.json"],
"ignorePatterns": ["!**/*"],
"overrides": [
{
"files": ["*.ts", "*.tsx", "*.js", "*.jsx"],
"rules": {}
},
{
"files": ["*.ts", "*.tsx"],
"rules": {}
},
{
"files": ["*.js", "*.jsx"],
"rules": {}
},
{
"files": ["*.json"],
"parser": "jsonc-eslint-parser",
"rules": {
"@nx/dependency-checks": "error"
}
},
{
"files": ["package.json"],
"parser": "jsonc-eslint-parser",
"rules": {
"@nx/dependency-checks": [
"error",
{
"ignoredDependencies": ["write-file-atomic"]
}
]
}
}
]
}

View File

@@ -0,0 +1,7 @@
# server-worker
This library was generated with [Nx](https://nx.dev).
## Building
Run `nx build server-worker` to build the library.

View File

@@ -0,0 +1,31 @@
{
"name": "server-worker",
"version": "0.1.0",
"type": "commonjs",
"main": "./src/index.js",
"typings": "./src/index.d.ts",
"dependencies": {
"@activepieces/pieces-framework": "0.22.2",
"@activepieces/server-shared": "0.0.2",
"@activepieces/shared": "0.30.2",
"write-file-atomic": "5.0.1",
"tslib": "2.6.2",
"@opentelemetry/api": "1.9.0",
"axios": "1.13.1",
"axios-retry": "4.4.1",
"fastify": "5.4.0",
"fetch-retry": "6.0.0",
"p-limit": "2.3.0",
"chalk": "4.1.2",
"chokidar": "4.0.3",
"nanoid": "3.3.8",
"tree-kill": "1.2.2",
"dayjs": "1.11.9",
"bullmq": "5.61.0",
"bullmq-otel": "1.0.1",
"socket.io-client": "4.8.1",
"jsonwebtoken": "9.0.1",
"socket.io": "4.8.1",
"http-status-codes": "2.2.0"
}
}

View File

@@ -0,0 +1,23 @@
{
"name": "server-worker",
"$schema": "../../../node_modules/nx/schemas/project-schema.json",
"sourceRoot": "packages/server/worker/src",
"projectType": "library",
"targets": {
"build": {
"executor": "@nx/js:tsc",
"outputs": ["{options.outputPath}"],
"options": {
"outputPath": "dist/packages/server/worker",
"main": "packages/server/worker/src/index.ts",
"tsConfig": "packages/server/worker/tsconfig.lib.json",
"assets": ["packages/server/worker/*.md"]
}
},
"lint": {
"executor": "@nx/eslint:lint",
"outputs": ["{options.outputFile}"]
}
},
"tags": []
}

View File

@@ -0,0 +1,10 @@
export * from './lib/utils/machine'
export * from './lib/compute/engine-runner-types'
export * from './lib/consume/executors/flow-job-executor'
export * from './lib/utils/webhook-utils'
export * from './lib/flow-worker'
export * from './lib/utils/machine'
export * from './lib/cache/pieces/development/dev-pieces-builder'
export * from './lib/cache/package-manager'
export * from './lib/cache/pieces/production/registry-piece-manager'
import 'chokidar'

View File

@@ -0,0 +1,103 @@
import { isNil, spreadIfDefined } from '@activepieces/shared'
import { context, propagation } from '@opentelemetry/api'
import axios, { AxiosError, AxiosInstance, isAxiosError } from 'axios'
import axiosRetry from 'axios-retry'
export class ApAxiosError extends Error {
constructor(public error: AxiosError, message?: string) {
super(message)
}
}
export class ApAxiosClient {
private _axios: AxiosInstance
constructor(baseUrl: string, apiToken: string) {
this._axios = axios.create({
baseURL: baseUrl,
headers: {
'Content-Type': 'application/json',
Authorization: `Bearer ${apiToken}`,
},
})
this._axios.interceptors.request.use((config) => {
const traceHeaders: Record<string, string> = {}
propagation.inject(context.active(), traceHeaders)
Object.entries(traceHeaders).forEach(([key, value]) => {
config.headers.set(key, value)
})
return config
})
axiosRetry(this._axios, {
retries: 3,
retryDelay: (retryCount: number) => {
return retryCount * 5000
},
// eslint-disable-next-line @typescript-eslint/no-explicit-any
retryCondition: (error: any) => {
return !isNil(error?.response?.status) && (error?.response?.status >= 500)
},
})
}
static isApAxiosError(error: unknown): error is ApAxiosError {
return error instanceof ApAxiosError
}
async post<T>(url: string, data: unknown): Promise<T> {
try {
const response = await this._axios.post<T>(url, data)
return response.data
}
catch (error) {
if (isAxiosError(error)) {
throw this.formatAxiosError(error)
}
else {
throw error
}
}
}
async get<T>(url: string, opts: {
params?: Record<string, string>
responseType?: 'arraybuffer' | undefined
}): Promise<T> {
try {
const response = await this._axios.get<T>(url, {
...spreadIfDefined('params', opts.params),
...spreadIfDefined('responseType', opts.responseType),
})
return response.data
}
catch (error) {
if (isAxiosError(error)) {
throw this.formatAxiosError(error)
}
else {
throw error
}
}
}
private formatAxiosError(error: AxiosError): Error {
const { request, response, message } = error
const newError = new ApAxiosError(error, JSON.stringify({
message,
request: request && {
method: request.method,
url: request.path,
headers: request._header,
},
response: response && {
status: response.status,
headers: response.headers,
data: response.data,
},
}))
return newError
}
}

View File

@@ -0,0 +1,153 @@
import { PieceMetadataModel } from '@activepieces/pieces-framework'
import { MigrateJobsRequest, SavePayloadRequest, SubmitPayloadsRequest } from '@activepieces/server-shared'
import { ExecutioOutputFile, FlowRun, FlowVersion, GetFlowVersionForWorkerRequest, GetPieceRequestQuery, JobData, tryCatch } from '@activepieces/shared'
import { trace } from '@opentelemetry/api'
import fetchRetry from 'fetch-retry'
import pLimit from 'p-limit'
import { workerMachine } from '../utils/machine'
import { ApAxiosClient } from './ap-axios'
const fetchWithRetry = fetchRetry(global.fetch)
const tracer = trace.getTracer('worker-api-service')
const removeTrailingSlash = (url: string): string => {
return url.endsWith('/') ? url.slice(0, -1) : url
}
export const flowRunLogs = {
async get(fullUrl: string): Promise<ExecutioOutputFile | null> {
const { data, error } = await tryCatch<ExecutioOutputFile | null, Error>(
async () => {
const response = await fetchWithRetry(fullUrl, {
method: 'GET',
headers: {
'Content-Type': 'application/json',
},
retries: 3,
retryDelay: 3000,
retryOn: (status: number) => Math.floor(status / 100) === 5,
})
if (response.status === 404) {
return null
}
return (await response.json()) as ExecutioOutputFile
},
)
if (error) {
if (error instanceof SyntaxError) {
return null
}
throw error
}
return data
},
}
export const workerApiService = () => {
const client = new ApAxiosClient(removeTrailingSlash(workerMachine.getInternalApiUrl()), workerMachine.getWorkerToken())
return {
async savePayloadsAsSampleData(request: SavePayloadRequest): Promise<void> {
await client.post('/v1/workers/save-payloads', request)
},
async getPieceArchive(fileId: string): Promise<Buffer> {
return client.get<Buffer>(`/v1/workers/archive/${fileId}`, {
responseType: 'arraybuffer',
})
},
async migrateJob(request: MigrateJobsRequest): Promise<JobData> {
return client.post<JobData>('/v1/workers/migrate-job', request)
},
async startRuns(request: SubmitPayloadsRequest): Promise<FlowRun[]> {
return tracer.startActiveSpan('worker.api.startRuns', {
attributes: {
'worker.flowVersionId': request.flowVersionId,
'worker.projectId': request.projectId,
'worker.environment': request.environment,
'worker.payloadsCount': request.payloads.length,
'worker.httpRequestId': request.httpRequestId ?? 'none',
},
}, async (span) => {
try {
const arrayOfPayloads = splitPayloadsIntoOneMegabyteBatches(request.payloads)
span.setAttribute('worker.batchesCount', arrayOfPayloads.length)
const limit = pLimit(1)
const promises = arrayOfPayloads.map(payloads =>
limit(() => client.post<FlowRun[]>('/v1/workers/submit-payloads', {
...request,
payloads,
parentRunId: request.parentRunId,
failParentOnFailure: request.failParentOnFailure,
})),
)
const results = await Promise.allSettled(promises)
const errors = results.filter((r): r is PromiseRejectedResult => r.status === 'rejected')
if (errors.length > 0) {
const errorMessages = errors.map(e => e.reason.message).join(', ')
span.setAttribute('worker.error', true)
span.setAttribute('worker.errorMessage', errorMessages)
throw new Error(`Failed to start runs: ${errorMessages}`)
}
const flowRuns = results
.filter((r): r is PromiseFulfilledResult<FlowRun[]> => r.status === 'fulfilled')
.map(r => r.value)
.flat()
span.setAttribute('worker.runsCreated', flowRuns.length)
return flowRuns
}
finally {
span.end()
}
})
},
}
}
function splitPayloadsIntoOneMegabyteBatches(payloads: unknown[]): unknown[][] {
const batches: unknown[][] = [[]]
const ONE_MB = 1024 * 1024
let currentSize = 0
for (const payload of payloads) {
const payloadSize = Buffer.byteLength(JSON.stringify(payload))
currentSize += payloadSize
if (currentSize > ONE_MB) {
batches.push([])
currentSize = payloadSize
}
batches[batches.length - 1].push(payload)
}
return batches
}
export const engineApiService = (engineToken: string) => {
const apiUrl = removeTrailingSlash(workerMachine.getInternalApiUrl())
const client = new ApAxiosClient(apiUrl, engineToken)
return {
async getPiece(name: string, options: GetPieceRequestQuery): Promise<PieceMetadataModel> {
return client.get<PieceMetadataModel>(`/v1/pieces/${encodeURIComponent(name)}`, {
params: options,
})
},
async getFlowVersion(request: GetFlowVersionForWorkerRequest): Promise<FlowVersion | null> {
return client.get<FlowVersion | null>('/v1/engine/flows', {
params: request,
})
},
}
}

View File

@@ -0,0 +1,96 @@
import { emitWithAck as emitWithAckUtil, tryCatch, WebsocketServerEvent } from '@activepieces/shared'
import { FastifyBaseLogger } from 'fastify'
import { io, Socket } from 'socket.io-client'
import { workerMachine } from './utils/machine'
let socket: Socket
let workerToken: string
export const appSocket = (log: FastifyBaseLogger) => ({
init: async (params: {
workerToken: string
onConnect: (socket: Socket) => Promise<void>
}): Promise<void> => {
workerToken = params.workerToken
const { url, path } = workerMachine.getSocketUrlAndPath()
socket = io(url, {
transports: ['websocket'],
path,
autoConnect: false,
reconnection: true,
})
socket.auth = {
token: workerToken,
workerId: workerMachine.getWorkerId(),
platformIdForDedicatedWorker: workerMachine.getPlatformIdForDedicatedWorker(),
}
socket.on('connect', async () => {
log.info({
message: 'Connected to server',
workerId: workerMachine.getWorkerId(),
socketId: socket.id,
})
await params.onConnect(socket)
})
socket.io.on('reconnect_attempt', (attempt: number) => {
log.info({
message: 'Socket reconnect attempt',
attempt,
})
})
socket.on('connect_error', (error) => {
log.error({
message: 'Socket connection error',
error: error.message,
})
})
socket.on('error', (error) => {
log.error({
message: 'Socket error',
error: error.message,
})
})
socket.connect()
socket.on(WebsocketServerEvent.WORKER_HEALTHCHECK, async (_, callback) => {
const settings = await workerMachine.getSystemInfo()
callback?.(settings)
})
},
emitWithAck: async <T = unknown>(event: string, data: unknown): Promise<T> => {
const result = await tryCatch(() => {
return emitWithAckUtil<T>(socket, event, data, {
timeoutMs: 4000,
retries: 3,
retryDelayMs: 2000,
})
})
if (result.error) {
log.error({
message: 'Failed to emit event',
event,
data,
error: result.error,
})
throw result.error
}
return result.data
},
disconnect: (): void => {
if (socket) {
socket.disconnect()
}
},
})

View File

@@ -0,0 +1,97 @@
import { readFile } from 'node:fs/promises'
import { join } from 'path'
import { fileSystemUtils, memoryLock } from '@activepieces/server-shared'
import { isNil } from '@activepieces/shared'
import { FastifyBaseLogger } from 'fastify'
import writeFileAtomic from 'write-file-atomic'
type CacheMap = Record<string, string>
const cachePath = (folderPath: string): string =>
join(folderPath, 'cache.json')
const cached: Record<string, CacheMap | null> = {}
export const NO_SAVE_GUARD = (_: string): boolean => false
export const cacheState = (folderPath: string, log: FastifyBaseLogger) => {
return {
async getOrSetCache({
cacheMiss,
key,
installFn,
skipSave,
}: CacheStateParams): Promise<CacheResult> {
const cache = await readCacheFromMemory(folderPath)
const value = cache[key] as string | null
if (!isNil(value) && !cacheMiss(value)) {
return {
cacheHit: true,
state: value,
}
}
return memoryLock.runExclusive({
key: `cache-save-${folderPath}`,
fn: async () => {
const cacheFromDisk = await readCacheFromFile(folderPath)
const valueFromDisk = cacheFromDisk[key]
if (!isNil(valueFromDisk) && !cacheMiss(valueFromDisk)) {
cached[folderPath] = cacheFromDisk
return { cacheHit: true, state: valueFromDisk }
}
const value = await installFn()
if (skipSave(value)) {
return {
cacheHit: false,
state: value,
}
}
const freshCache = await cacheState(folderPath, log).saveCache(
key,
value,
)
cached[folderPath] = freshCache
return {
cacheHit: false,
state: value,
}
},
})
},
saveCache: async (key: string, value: string): Promise<CacheMap> => {
await fileSystemUtils.threadSafeMkdir(folderPath)
const cacheFilePath = cachePath(folderPath)
const freshCache = await readCacheFromFile(folderPath)
freshCache[key] = value
await writeFileAtomic(cacheFilePath, JSON.stringify(freshCache), 'utf8')
return freshCache
},
}
}
async function readCacheFromFile(folderPath: string): Promise<CacheMap> {
const filePath = cachePath(folderPath)
const fileExists = await fileSystemUtils.fileExists(filePath)
if (!fileExists) {
return {}
}
const fileContent = await readFile(filePath, 'utf8')
return JSON.parse(fileContent)
}
async function readCacheFromMemory(folderPath: string): Promise<CacheMap> {
if (isNil(cached[folderPath])) {
cached[folderPath] = await readCacheFromFile(folderPath)
}
return cached[folderPath]
}
type CacheResult = {
cacheHit: boolean
state: string | null
}
type CacheStateParams = {
key: string
cacheMiss: (value: string) => boolean
installFn: () => Promise<string>
skipSave: (value: string) => boolean
}

View File

@@ -0,0 +1,216 @@
import fs, { rm } from 'node:fs/promises'
import path from 'node:path'
import { cryptoUtils, fileSystemUtils } from '@activepieces/server-shared'
import { ExecutionMode, tryCatch } from '@activepieces/shared'
import { FastifyBaseLogger } from 'fastify'
import { CodeArtifact } from '../compute/engine-runner-types'
import { workerMachine } from '../utils/machine'
import { cacheState, NO_SAVE_GUARD } from './cache-state'
import { packageManager } from './package-manager'
const TS_CONFIG_CONTENT = `
{
"compilerOptions": {
"lib": ["es2022", "dom"],
"module": "commonjs",
"target": "es2022",
"esModuleInterop": true,
"skipLibCheck": true,
"forceConsistentCasingInFileNames": true,
"noUnusedLocals": false,
"noUnusedParameters": false,
"strict": false,
"strictPropertyInitialization": false,
"strictNullChecks": false,
"strictFunctionTypes": false,
"strictBindCallApply": false,
"noImplicitAny": false,
"noImplicitThis": false,
"noImplicitReturns": false,
"noFallthroughCasesInSwitch": false
}
}
`
const INVALID_ARTIFACT_TEMPLATE = `
exports.code = async (params) => {
throw new Error(\`\${ERROR_MESSAGE}\`);
};
`
const INVALID_ARTIFACT_ERROR_PLACEHOLDER = '${ERROR_MESSAGE}'
export const codeBuilder = (log: FastifyBaseLogger) => ({
getCodesFolder({
codesFolderPath,
flowVersionId,
}: {
codesFolderPath: string
flowVersionId: string
}): string {
return path.join(codesFolderPath, flowVersionId)
},
async processCodeStep({
artifact,
codesFolderPath,
}: ProcessCodeStepParams): Promise<void> {
const { sourceCode, flowVersionId, name } = artifact
const flowVersionPath = this.getCodesFolder({
codesFolderPath,
flowVersionId,
})
const codePath = path.join(flowVersionPath, name)
log.debug({
message: 'CodeBuilder#processCodeStep',
sourceCode,
name,
codePath,
})
const currentHash = await cryptoUtils.hashObject(sourceCode)
const cache = cacheState(codePath, log)
await cache.getOrSetCache({
key: codePath,
cacheMiss: (value: string) => {
return value !== currentHash
},
installFn: async () => {
const { code, packageJson } = sourceCode
const codeNeedCleanUp = await fileSystemUtils.fileExists(codePath)
if (codeNeedCleanUp) {
await rm(codePath, { recursive: true })
}
await fileSystemUtils.threadSafeMkdir(codePath)
const startTime = performance.now()
await installDependencies({
path: codePath,
packageJson: await getPackageJson(packageJson),
log,
})
log.info({
message: '[CodeBuilder#processCodeStep] Installed dependencies',
path: codePath,
timeTaken: `${Math.floor(performance.now() - startTime)}ms`,
})
const startTimeCompilation = performance.now()
const { error } = await tryCatch(() => compileCode({
path: codePath,
code,
log,
}))
if (error) {
log.info({ codePath, error }, '[CodeBuilder#processCodeStep] Compilation error')
await handleCompilationError({ codePath, error })
}
else {
log.info({ codePath, timeTaken: `${Math.floor(performance.now() - startTimeCompilation)}ms` }, '[CodeBuilder#processCodeStep] Compilation success')
}
return currentHash
},
skipSave: NO_SAVE_GUARD,
})
},
})
function isPackagesAllowed(): boolean {
switch (workerMachine.getSettings().EXECUTION_MODE) {
case ExecutionMode.SANDBOX_CODE_ONLY:
return false
case ExecutionMode.SANDBOX_CODE_AND_PROCESS:
case ExecutionMode.UNSANDBOXED:
case ExecutionMode.SANDBOX_PROCESS:
return true
default:
return false
}
}
async function getPackageJson(packageJson: string): Promise<string> {
const packagedAllowed = isPackagesAllowed()
if (!packagedAllowed) {
return '{"dependencies":{}}'
}
const { data: parsedPackageJson, error: parseError } = await tryCatch(() => JSON.parse(packageJson))
const packageJsonObject = parseError ? {} : (parsedPackageJson as Record<string, unknown>)
return JSON.stringify({
...packageJsonObject,
dependencies: {
'@types/node': '18.17.1',
...(packageJsonObject?.['dependencies'] ?? {}),
},
})
}
const installDependencies = async ({ path, packageJson, log }: InstallDependenciesParams): Promise<void> => {
await fs.writeFile(`${path}/package.json`, packageJson, 'utf8')
const deps = Object.entries(JSON.parse(packageJson).dependencies ?? {})
if (deps.length > 0) {
await packageManager(log).install({ path, filtersPath: [] })
}
}
const compileCode = async ({
path,
code,
log,
}: CompileCodeParams): Promise<void> => {
await fs.writeFile(`${path}/tsconfig.json`, TS_CONFIG_CONTENT, {
encoding: 'utf8',
flag: 'w',
})
await fs.writeFile(`${path}/index.ts`, code, { encoding: 'utf8', flag: 'w' })
await packageManager(log).build({
path,
entryFile: `${path}/index.ts`,
outputFile: `${path}/index.js`,
})
}
const handleCompilationError = async ({
codePath,
error,
}: HandleCompilationErrorParams): Promise<void> => {
const errorHasStdout =
typeof error === 'object' && error && 'stdout' in error
const stdoutError = errorHasStdout ? error.stdout : undefined
const genericError = `${error ?? 'error compiling'}`
const errorMessage = `Compilation Error ${stdoutError ?? genericError}`
const invalidArtifactContent = INVALID_ARTIFACT_TEMPLATE.replace(
INVALID_ARTIFACT_ERROR_PLACEHOLDER,
errorMessage,
)
await fs.writeFile(`${codePath}/index.js`, invalidArtifactContent, 'utf8')
}
type ProcessCodeStepParams = {
artifact: CodeArtifact
codesFolderPath: string
log: FastifyBaseLogger
}
type InstallDependenciesParams = {
path: string
packageJson: string
log: FastifyBaseLogger
}
type CompileCodeParams = {
path: string
code: string
log: FastifyBaseLogger
}
type HandleCompilationErrorParams = {
codePath: string
error: unknown
}

View File

@@ -0,0 +1,50 @@
import { PathLike } from 'fs'
import { copyFile, rename } from 'node:fs/promises'
import { dirname, join } from 'node:path'
import { fileSystemUtils, systemConstants } from '@activepieces/server-shared'
import { ApEnvironment } from '@activepieces/shared'
import { FastifyBaseLogger } from 'fastify'
import { nanoid } from 'nanoid'
import { workerMachine } from '../utils/machine'
import { cacheState, NO_SAVE_GUARD } from './cache-state'
const engineExecutablePath = systemConstants.ENGINE_EXECUTABLE_PATH
const ENGINE_CACHE_ID = nanoid()
const ENGINE_INSTALLED = 'ENGINE_INSTALLED'
export const engineInstaller = (log: FastifyBaseLogger) => ({
async install({ path }: InstallParams): Promise<EngineInstallResult> {
const isDev = workerMachine.getSettings().ENVIRONMENT === ApEnvironment.DEVELOPMENT
const cache = cacheState(path, log)
const { cacheHit } = await cache.getOrSetCache({
key: ENGINE_INSTALLED,
cacheMiss: (key: string) => {
const isEngineInstalled = key == ENGINE_CACHE_ID
return !isEngineInstalled || isDev
},
installFn: async () => {
await atomicCopy(engineExecutablePath, `${path}/main.js`)
await atomicCopy(`${engineExecutablePath}.map`, `${path}/main.js.map`)
return ENGINE_CACHE_ID
},
skipSave: NO_SAVE_GUARD,
})
return { cacheHit }
},
})
async function atomicCopy(src: PathLike, dest: PathLike): Promise<void> {
const destDir = dirname(dest.toString())
const tempPath = join(destDir, 'engine.temp.js')
await fileSystemUtils.threadSafeMkdir(destDir)
await copyFile(src, tempPath)
await rename(tempPath, dest)
}
type InstallParams = {
path: string
}
type EngineInstallResult = {
cacheHit: boolean
}

View File

@@ -0,0 +1,73 @@
import { fileSystemUtils } from '@activepieces/server-shared'
import { getPieceNameFromAlias, PiecePackage, unique } from '@activepieces/shared'
import { FastifyBaseLogger } from 'fastify'
import { CodeArtifact } from '../compute/engine-runner-types'
import { workerMachine } from '../utils/machine'
import { codeBuilder } from './code-builder'
import { engineInstaller } from './engine-installer'
import { registryPieceManager } from './pieces/production/registry-piece-manager'
import { GLOBAL_CACHE_COMMON_PATH, GLOBAL_CACHE_PATH_LATEST_VERSION, GLOBAL_CODE_CACHE_PATH } from './worker-cache'
export const executionFiles = (log: FastifyBaseLogger) => ({
async provision({
pieces,
codeSteps,
}: ProvisionParams): Promise<void> {
const startTime = performance.now()
await fileSystemUtils.threadSafeMkdir(GLOBAL_CACHE_PATH_LATEST_VERSION)
const startTimeCode = performance.now()
await fileSystemUtils.threadSafeMkdir(GLOBAL_CODE_CACHE_PATH)
// This is sequential to ensure the worker machine is not overloaded
for (const artifact of codeSteps) {
await codeBuilder(log).processCodeStep({
artifact,
codesFolderPath: GLOBAL_CODE_CACHE_PATH,
log,
})
}
log.info({
path: GLOBAL_CODE_CACHE_PATH,
timeTaken: `${Math.floor(performance.now() - startTimeCode)}ms`,
}, 'Installed code in sandbox')
const startTimeEngine = performance.now()
const { cacheHit } = await engineInstaller(log).install({
path: GLOBAL_CACHE_COMMON_PATH,
})
log.info({
path: GLOBAL_CACHE_COMMON_PATH,
timeTaken: `${Math.floor(performance.now() - startTimeEngine)}ms`,
cacheHit,
}, 'Installed engine in sandbox')
const devPieces = workerMachine.getSettings().DEV_PIECES
const nonDevPieces = unique(pieces.filter((p) => !devPieces.includes(getPieceNameFromAlias(p.pieceName))))
if (nonDevPieces.length > 0) {
const startTime = performance.now()
await registryPieceManager(log).install({
pieces: nonDevPieces,
includeFilters: true,
broadcast: true,
})
log.info({
pieces: nonDevPieces.map(p => `${p.pieceName}@${p.pieceVersion}`),
path: GLOBAL_CACHE_COMMON_PATH,
timeTaken: `${Math.floor(performance.now() - startTime)}ms`,
}, 'Installed pieces in sandbox')
}
log.info({
timeTaken: `${Math.floor(performance.now() - startTime)}ms`,
}, 'Sandbox installation complete')
},
})
type ProvisionParams = {
pieces: PiecePackage[]
codeSteps: CodeArtifact[]
}

View File

@@ -0,0 +1,64 @@
import path from 'path'
import { FlowVersion, FlowVersionId, FlowVersionState, isNil, LATEST_FLOW_SCHEMA_VERSION } from '@activepieces/shared'
import { FastifyBaseLogger } from 'fastify'
import { ApAxiosClient } from '../api/ap-axios'
import { engineApiService } from '../api/server-api.service'
import { cacheState } from './cache-state'
import { GLOBAL_CACHE_FLOWS_PATH } from './worker-cache'
export const flowWorkerCache = (log: FastifyBaseLogger) => ({
async getVersion({ engineToken, flowVersionId }: GetFlowRequest): Promise<FlowVersion | null> {
try {
const cache = cacheState(path.join(GLOBAL_CACHE_FLOWS_PATH, flowVersionId), log)
const { state } = await cache.getOrSetCache({
key: flowVersionId,
cacheMiss: (flow: string) => {
if (isNil(flow)) {
return true
}
const parsedFlow = JSON.parse(flow) as FlowVersion
return parsedFlow.schemaVersion !== LATEST_FLOW_SCHEMA_VERSION
},
installFn: async () => {
const startTime = performance.now()
const flowVersion = await engineApiService(engineToken).getFlowVersion({
versionId: flowVersionId,
})
log.info({
message: '[flowWorkerCache] Installing flow',
flowVersionId,
state: flowVersion?.state,
found: !isNil(flowVersion),
timeTaken: `${Math.floor(performance.now() - startTime)}ms`,
})
return JSON.stringify(flowVersion)
},
skipSave: (flow: string) => {
if (isNil(flow)) {
return true
}
const parsedFlow = JSON.parse(flow) as FlowVersion
return parsedFlow.state !== FlowVersionState.LOCKED
},
})
if (isNil(state)) {
return null
}
const flowVersion = JSON.parse(state as string) as FlowVersion
return flowVersion
}
catch (e) {
if (ApAxiosClient.isApAxiosError(e) && e.error.response && e.error.response.status === 404) {
return null
}
throw e
}
},
})
type GetFlowRequest = {
engineToken: string
flowVersionId: FlowVersionId
}

View File

@@ -0,0 +1,74 @@
import {
CommandOutput,
execPromise,
fileSystemUtils,
spawnWithKill,
} from '@activepieces/server-shared'
import { tryCatch } from '@activepieces/shared'
import dayjs from 'dayjs'
import { FastifyBaseLogger } from 'fastify'
export const packageManager = (log: FastifyBaseLogger) => ({
async validate(): Promise<void> {
await execPromise('bun --version')
await execPromise('bun install')
},
async install({ path, filtersPath }: InstallParams): Promise<CommandOutput> {
const args = [
'--ignore-scripts',
'--linker isolated',
]
const filters: string[] = filtersPath
.map(sanitizeFilterPath)
.map((path) => `--filter ./${path}`)
await fileSystemUtils.threadSafeMkdir(path)
log.debug({ path, args, filters }, '[PackageManager#install]')
const { error, data } = await tryCatch(async () => spawnWithKill({
cmd: `bun install ${args.join(' ')} ${filters.join(' ')}`,
options: {
cwd: path,
},
printOutput: false,
timeoutMs: dayjs.duration(10, 'minutes').asMilliseconds(),
}))
if (error) {
log.error({ error }, '[PackageManager#install] Failed to install dependencies')
throw error
}
return data
},
async build({ path, entryFile, outputFile }: BuildParams): Promise<CommandOutput> {
const config = [
`${entryFile}`,
'--target node',
'--production',
'--format cjs',
`--outfile ${outputFile}`,
]
log.debug({ path, entryFile, outputFile, config }, '[PackageManager#build]')
return execPromise(`bun build ${config.join(' ')}`, { cwd: path })
},
})
const sanitizeFilterPath = (filterPath: string): string => {
const allowed = /^(?![.])[a-zA-Z0-9\-_.@/]+$/
if (!allowed.test(filterPath)) {
throw new Error(`Invalid filter path ${filterPath}`)
}
return filterPath
}
type InstallParams = {
path: string
filtersPath: string[]
}
type BuildParams = {
path: string
entryFile: string
outputFile: string
}

View File

@@ -0,0 +1,97 @@
import path from 'path'
import { AppSystemProp, environmentVariables } from '@activepieces/server-shared'
import { ApEnvironment, EXACT_VERSION_REGEX, PackageType, PiecePackage, PieceType } from '@activepieces/shared'
import { FastifyBaseLogger } from 'fastify'
import { engineApiService } from '../api/server-api.service'
import { workerMachine } from '../utils/machine'
import { cacheState, NO_SAVE_GUARD } from './cache-state'
import { GLOBAL_CACHE_PIECES_PATH } from './worker-cache'
export const pieceWorkerCache = (log: FastifyBaseLogger) => ({
async getPiece({ engineToken, pieceName, pieceVersion, platformId }: GetPieceRequestQueryWorker): Promise<PiecePackage> {
const isExactVersion = EXACT_VERSION_REGEX.test(pieceVersion)
const skipRelativeVersions = !isExactVersion
if (skipRelativeVersions) {
return getPiecePackage({ engineToken, pieceName, pieceVersion, platformId })
}
const cacheKey = `${pieceName}-${pieceVersion}-${platformId}`
const cache = cacheState(path.join(GLOBAL_CACHE_PIECES_PATH, cacheKey), log)
const { state } = await cache.getOrSetCache({
key: cacheKey,
cacheMiss: (_: string) => {
const environment = environmentVariables.getEnvironment(AppSystemProp.ENVIRONMENT)
if (environment === ApEnvironment.TESTING) {
return true
}
const devPieces = workerMachine.getSettings().DEV_PIECES
if (devPieces.includes(pieceName)) {
return true
}
return false
},
installFn: async () => {
const startTime = performance.now()
const piecePackage = await getPiecePackage({ engineToken, pieceName, pieceVersion, platformId })
log.info({
message: '[pieceWorkerCache] Cached piece',
pieceName,
pieceVersion,
platformId,
timeTaken: `${Math.floor(performance.now() - startTime)}ms`,
})
return JSON.stringify(piecePackage)
},
skipSave: NO_SAVE_GUARD,
})
return JSON.parse(state as string) as PiecePackage
},
})
async function getPiecePackage(query: GetPieceRequestQueryWorker): Promise<PiecePackage> {
const pieceMetadata = await engineApiService(query.engineToken).getPiece(query.pieceName, {
version: query.pieceVersion,
})
const baseProps = {
packageType: pieceMetadata.packageType,
pieceName: pieceMetadata.name,
pieceVersion: pieceMetadata.version,
pieceType: pieceMetadata.pieceType,
}
if (pieceMetadata.packageType === PackageType.ARCHIVE) {
return {
...baseProps,
archiveId: pieceMetadata.archiveId!,
platformId: query.platformId,
} as PiecePackage
}
if (pieceMetadata.pieceType === PieceType.CUSTOM) {
return {
...baseProps,
platformId: query.platformId,
} as PiecePackage
}
return baseProps as PiecePackage
}
type GetPieceRequestQueryWorker = PieceCacheKey & {
engineToken: string
}
type PieceCacheKey = {
pieceName: string
pieceVersion: string
platformId: string
}

View File

@@ -0,0 +1,130 @@
import fs from 'fs/promises'
import { resolve } from 'path'
import { ApLock, filePiecesUtils, memoryLock, spawnWithKill } from '@activepieces/server-shared'
import { debounce, isNil, WebsocketClientEvent } from '@activepieces/shared'
import chalk from 'chalk'
import chokidar, { FSWatcher } from 'chokidar'
import { FastifyBaseLogger, FastifyInstance } from 'fastify'
import { Server } from 'socket.io'
import { cacheState } from '../../cache-state'
import { CacheState, GLOBAL_CACHE_COMMON_PATH } from '../../worker-cache'
import { devPiecesInstaller } from './dev-pieces-installer'
export const PIECES_BUILDER_MUTEX_KEY = 'pieces-builder'
async function checkBuildTarget(nxProjectFilePath: string): Promise<string> {
try {
const nxProjectJson = JSON.parse(await fs.readFile(nxProjectFilePath, 'utf-8'))
if ('targets' in nxProjectJson && nxProjectJson.targets && nxProjectJson.targets['build-with-deps']) {
return 'build-with-deps'
}
return 'build'
}
catch (error) {
return 'build'
}
}
async function handleFileChange(packages: string[], pieceName: string, packageName: string, nxProjectFilePath: string, io: Server, log: FastifyBaseLogger): Promise<void> {
const pieceProjectName = `pieces-${pieceName}`
log.info(
chalk.blueBright.bold(
'👀 Detected changes in pieces. Waiting... 👀 ' + pieceProjectName,
),
)
let lock: ApLock | undefined
try {
lock = await memoryLock.acquire(PIECES_BUILDER_MUTEX_KEY)
const buildTarget = await checkBuildTarget(nxProjectFilePath)
log.info(chalk.blue.bold(`🤌 Building pieces with target: ${buildTarget} for ${pieceProjectName}... 🤌`))
if (!/^[A-Za-z0-9-]+$/.test(pieceProjectName)) {
throw new Error(`Piece package name contains invalid character: ${pieceProjectName}`)
}
const startTime = Date.now()
await spawnWithKill({ cmd: `npx nx run-many -t ${buildTarget} --projects=${pieceProjectName}`, printOutput: true })
const endTime = Date.now()
const buildTime = (endTime - startTime) / 1000
log.info(chalk.blue.bold(`Build completed in ${buildTime.toFixed(2)} seconds`))
await devPiecesInstaller(log).linkSharedActivepiecesPackagesToPiece(packageName)
await devPiecesInstaller(log).linkSharedActivepiecesPackagesToEachOther()
const cache = cacheState(GLOBAL_CACHE_COMMON_PATH, log)
await cache.saveCache('@activepieces/pieces-framework', CacheState.PENDING)
await cache.saveCache('@activepieces/pieces-common', CacheState.PENDING)
await cache.saveCache('@activepieces/shared', CacheState.PENDING)
await cache.saveCache(packageName, CacheState.PENDING)
io.emit(WebsocketClientEvent.REFRESH_PIECE)
}
catch (error) {
log.info(error, chalk.red.bold('Failed to run build process...'))
}
finally {
if (lock) {
await lock.release()
}
log.info(
chalk.green.bold(
'✨ Changes are ready! Please refresh the frontend to see the new updates. ✨',
),
)
}
}
export async function devPiecesBuilder(app: FastifyInstance, io: Server, packages: string[]): Promise<void> {
const watchers: FSWatcher[] = []
await devPiecesInstaller(app.log).installPiecesDependencies(packages)
for (const packageName of packages) {
app.log.info(chalk.blue(`Starting watch for package: ${packageName}`))
const pieceDirectory = await filePiecesUtils(app.log).findSourcePiecePathByPieceName(packageName)
if (isNil(pieceDirectory)) {
app.log.info(chalk.yellow(`Piece directory not found for package: ${packageName}`))
continue
}
app.log.info(chalk.yellow(`Found piece directory: ${pieceDirectory}`))
const packageJsonName = await filePiecesUtils(app.log).getPackageNameFromFolderPath(pieceDirectory)
const nxProjectJson = await filePiecesUtils(app.log).getProjectJsonFromFolderPath(pieceDirectory)
const debouncedHandleFileChange = debounce(() => {
handleFileChange(packages, packageName, packageJsonName, nxProjectJson, io, app.log).catch(app.log.error)
}, 2000)
await handleFileChange(packages, packageName, packageJsonName, nxProjectJson, io, app.log)
const watcher = chokidar.watch(resolve(pieceDirectory), {
ignored: [/^\./, /node_modules/, /dist/],
persistent: true,
ignoreInitial: true,
awaitWriteFinish: {
stabilityThreshold: 2000,
pollInterval: 200,
},
})
watcher.on('all', (event, path) => {
if (path.endsWith('.ts') || path.endsWith('package.json')) {
debouncedHandleFileChange()
}
})
watchers.push(watcher)
}
app.addHook('onClose', () => {
for (const watcher of watchers) {
watcher.close().catch(app.log.error)
}
})
}

View File

@@ -0,0 +1,91 @@
import { resolve } from 'node:path'
import { cwd } from 'node:process'
import { filePiecesUtils, spawnWithKill } from '@activepieces/server-shared'
import chalk from 'chalk'
import { FastifyBaseLogger } from 'fastify'
const baseDistPath = resolve(cwd(), 'dist', 'packages')
const sharedPiecesPackages = () => {
const packages: Record<string, { path: string }> = {
'@activepieces/pieces-framework': {
path: resolve(baseDistPath, 'pieces', 'community', 'framework'),
},
'@activepieces/pieces-common': {
path: resolve(baseDistPath, 'pieces', 'community', 'common'),
},
'@activepieces/shared': {
path: resolve(cwd(), 'dist', 'packages', 'shared'),
},
}
return packages
}
export const devPiecesInstaller = (log: FastifyBaseLogger) => {
const utils = filePiecesUtils(log)
async function installPiecesDependencies(packageNames: string[]): Promise<void> {
const deps = new Set<string>()
for (const packageName of packageNames) {
const folderPath = await utils.findSourcePiecePathByPieceName(packageName)
if (!folderPath) continue
const pieceDependencies = await utils.getPieceDependencies(folderPath)
if (!pieceDependencies) continue
Object.keys(pieceDependencies).forEach((key) => deps.add(`${key}@${pieceDependencies[key as keyof typeof pieceDependencies]}`))
}
if (deps.size > 0) {
log.info(chalk.yellow(`Installing Pieces Dependencies: ${Array.from(deps).join(' ')}`))
await spawnWithKill({ cmd: `bun install ${Array.from(deps).join(' ')} --no-save --silent`, printOutput: true })
}
}
async function linkSharedActivepiecesPackagesToPiece(packageName: string) {
const packagePath = await utils.findDistPiecePathByPackageName(packageName)
if (!packagePath) return
const dependencies = await utils.getPieceDependencies(packagePath)
const apDependencies = Object.keys(dependencies ?? {}).filter(dep => dep.startsWith('@activepieces/') && packageName !== dep)
apDependencies.forEach(async (dependency) => {
await spawnWithKill({ cmd: `bun link --cwd ${packagePath} --save ${dependency} --quiet`, printOutput: true }).catch(e => {
log.error({
name: 'linkSharedActivepiecesPackagesToPiece',
message: JSON.stringify(e),
}, 'Error linking shared activepieces packages to piece')
})
})
}
async function initSharedPackagesLinks() {
await Promise.all(Object.values(sharedPiecesPackages()).map(pkg =>
spawnWithKill({ cmd: `bun link --cwd ${pkg.path} --quiet`, printOutput: true }).catch(e => {
log.error({
name: 'initSharedPackagesLinks',
message: JSON.stringify(e),
}, 'Error initializing shared packages links')
}),
))
}
async function linkSharedActivepiecesPackagesToEachOther() {
await initSharedPackagesLinks()
const noneRegisteryPackages = sharedPiecesPackages()
const noneRegisteryPackagesKeys = Object.keys(noneRegisteryPackages)
noneRegisteryPackagesKeys.forEach(async key => linkSharedActivepiecesPackagesToPiece(key))
}
return {
installPiecesDependencies,
linkSharedActivepiecesPackagesToPiece,
linkSharedActivepiecesPackagesToEachOther,
}
}

View File

@@ -0,0 +1,337 @@
import { rm, writeFile } from 'node:fs/promises'
import path, { dirname, join } from 'node:path'
import { exceptionHandler, fileSystemUtils, memoryLock, pubsubFactory, redisHelper, rejectedPromiseHandler } from '@activepieces/server-shared'
import {
ExecutionMode,
groupBy,
isEmpty,
isNil,
PackageType,
PiecePackage,
PieceType,
PrivatePiecePackage,
tryCatch,
tryCatchSync,
} from '@activepieces/shared'
import { FastifyBaseLogger } from 'fastify'
import writeFileAtomic from 'write-file-atomic'
import { workerApiService } from '../../../api/server-api.service'
import { workerMachine } from '../../../utils/machine'
import { workerRedisConnections } from '../../../utils/worker-redis'
import { packageManager } from '../../package-manager'
import { GLOBAL_CACHE_ALL_VERSIONS_PATH, GLOBAL_CACHE_COMMON_PATH, GLOBAL_CACHE_PATH_LATEST_VERSION } from '../../worker-cache'
const usedPiecesMemoryCache: Record<string, boolean> = {}
const relativePiecePath = (piece: PiecePackage) => join('./', 'pieces', `${piece.pieceName}-${piece.pieceVersion}`)
const piecePath = (rootWorkspace: string, piece: PiecePackage) => join(rootWorkspace, 'pieces', `${piece.pieceName}-${piece.pieceVersion}`)
const REDIS_USED_PIECES_CACHE_KEY = 'cache:pieces:v1'
const REDIS_INSTALL_PIECES_CHANNEL = 'install-pieces'
const pubsub = pubsubFactory(workerRedisConnections.create)
const redisUsedPiecesCacheKey = (piece: PiecePackage) => {
switch (piece.packageType) {
case PackageType.REGISTRY:
return `${REDIS_USED_PIECES_CACHE_KEY}:registry:${piece.pieceName}:${piece.pieceVersion}`
case PackageType.ARCHIVE:
return `${REDIS_USED_PIECES_CACHE_KEY}:archive:${piece.archiveId}`
default:
throw new Error('Invalid package type')
}
}
export const registryPieceManager = (log: FastifyBaseLogger) => ({
validate: async (): Promise<void> => {
log.info('[registryPieceManager] Validating piece installation is working')
const testPiece: PiecePackage = {
packageType: PackageType.REGISTRY,
pieceType: PieceType.OFFICIAL,
pieceName: '@activepieces/piece-webhook',
pieceVersion: '0.1.25',
}
await tryCatch(async () => rollbackInstallation(GLOBAL_CACHE_COMMON_PATH, [testPiece]))
const { error } = await tryCatch(async () => registryPieceManager(log).install({
pieces: [testPiece],
includeFilters: false,
broadcast: false,
}))
if (error) {
log.error({ error }, `[registryPieceManager] Piece installation is not working, try delete ${GLOBAL_CACHE_ALL_VERSIONS_PATH} folder and restart the server`)
throw error
}
},
install: async ({ pieces, includeFilters, broadcast }: InstallParams): Promise<void> => {
const groupedPieces = groupPiecesByPackagePath(log, pieces)
const installPromises = Object.entries(groupedPieces).map(async ([packagePath, piecesInGroup]) => {
const { piecesToPersistOnRedis } = await installPieces(log, packagePath, piecesInGroup, includeFilters)
return piecesToPersistOnRedis
})
const piecesToPersistOnRedis = await Promise.all(installPromises).then(results => results.flat())
await persistPiecesOnRedis(piecesToPersistOnRedis)
if (broadcast) {
await pubsub.publish(REDIS_INSTALL_PIECES_CHANNEL, JSON.stringify(piecesToPersistOnRedis))
}
},
warmup: async (): Promise<void> => {
if (!workerMachine.preWarmCacheEnabled()) {
log.info('[registryPieceManager] warmup cache is disabled')
return
}
log.info('[registryPieceManager] Warming up pieces cache')
const startTime = performance.now()
const redis = await workerRedisConnections.useExisting()
const usedPiecesKey = await redisHelper.scanAll(redis, `${REDIS_USED_PIECES_CACHE_KEY}:*`)
const usedPiecesValues = usedPiecesKey.length > 0 ? await redis.mget(...usedPiecesKey) : []
const usedPieces = usedPiecesKey.filter((_key, index) => !isNil(usedPiecesValues[index])).map((_key, index) => JSON.parse(usedPiecesValues[index] as string))
await registryPieceManager(log).install({
pieces: usedPieces,
includeFilters: false,
broadcast: true,
})
log.info({
piecesCount: usedPieces.length,
timeTaken: `${Math.floor(performance.now() - startTime)}ms`,
}, '[registryPieceManager] Warmed up pieces cache')
},
distributedWarmup: async (): Promise<void> => {
await pubsub.subscribe(REDIS_INSTALL_PIECES_CHANNEL, (message) => {
log.debug('[registryPieceManager#subscribe] Received message from other worker to install pieces')
const { data: pieces, error } = tryCatchSync(() => JSON.parse(message) as PiecePackage[])
if (error) {
exceptionHandler.handle(error, log)
return
}
rejectedPromiseHandler(registryPieceManager(log).install({
pieces,
includeFilters: false,
broadcast: false,
}), log)
})
},
getCustomPiecesPath: (platformId: string): string => {
switch (workerMachine.getSettings().EXECUTION_MODE) {
case ExecutionMode.SANDBOX_PROCESS:
case ExecutionMode.SANDBOX_CODE_AND_PROCESS:
return path.resolve(GLOBAL_CACHE_PATH_LATEST_VERSION, 'custom_pieces', platformId)
case ExecutionMode.UNSANDBOXED:
case ExecutionMode.SANDBOX_CODE_ONLY:
return GLOBAL_CACHE_COMMON_PATH
default:
throw new Error('Invalid execution mode')
}
},
})
async function installPieces(log: FastifyBaseLogger, rootWorkspace: string, pieces: PiecePackage[], includeFilters: boolean): Promise<PieceInstallationResult> {
const { piecesToInstall, piecesToPersistOnRedis } = await partitionPiecesToInstallAndToPersist(rootWorkspace, pieces)
if (isEmpty(piecesToInstall)) {
log.debug({ rootWorkspace }, '[registryPieceManager] No new pieces to install (already installed)')
return {
piecesToInstall,
piecesToPersistOnRedis,
}
}
log.info({
rootWorkspace,
piecesToInstall: piecesToInstall.map(piece => `${piece.pieceName}-${piece.pieceVersion}`),
}, '[registryPieceManager] Installing pieces in workspace')
return memoryLock.runExclusive({
key: `install-pieces-${rootWorkspace}`,
fn: async () => {
const { piecesToInstall } = await partitionPiecesToInstallAndToPersist(rootWorkspace, pieces)
if (isEmpty(piecesToInstall)) {
log.info({ rootWorkspace }, '[registryPieceManager] No new pieces to install in lock (already installed)')
return {
piecesToInstall,
piecesToPersistOnRedis,
}
}
log.info({
rootWorkspace,
pieces: piecesToInstall.map(piece => `${piece.pieceName}-${piece.pieceVersion}`),
}, '[registryPieceManager] acquired lock and starting to install pieces')
await createRootPackageJson({
path: rootWorkspace,
})
await savePackageArchivesToDiskIfNotCached(rootWorkspace, piecesToInstall)
await Promise.all(piecesToInstall.map(piece => createPiecePackageJson({
rootWorkspace,
piecePackage: piece,
})))
const performanceStartTime = performance.now()
const { error: installError } = await tryCatch(async () => packageManager(log).install({
path: rootWorkspace,
filtersPath: includeFilters ? piecesToInstall.map(relativePiecePath) : [],
}))
if (!isNil(installError)) {
log.error({
rootWorkspace,
pieces: piecesToInstall.map(piece => `${piece.pieceName}-${piece.pieceVersion}`),
error: installError,
}, '[registryPieceManager] Piece installation failed, rolling back')
await rollbackInstallation(rootWorkspace, piecesToInstall)
throw installError
}
await markPiecesAsUsed(rootWorkspace, piecesToInstall)
log.info({
rootWorkspace,
piecesCount: piecesToInstall.length,
timeTaken: `${Math.floor(performance.now() - performanceStartTime)}ms`,
}, '[registryPieceManager] Installed registry pieces using bun')
return {
piecesToInstall,
piecesToPersistOnRedis,
}
},
})
}
async function rollbackInstallation(rootWorkspace: string, pieces: PiecePackage[]): Promise<void> {
await Promise.all(pieces.map(piece => rm(path.resolve(rootWorkspace, relativePiecePath(piece)), {
recursive: true,
})))
}
function groupPiecesByPackagePath(log: FastifyBaseLogger, pieces: PiecePackage[]): Record<string, PiecePackage[]> {
return groupBy(pieces, (piece) => {
switch (piece.packageType) {
case PackageType.ARCHIVE:
return registryPieceManager(log).getCustomPiecesPath(piece.platformId)
case PackageType.REGISTRY: {
if (piece.pieceType === PieceType.CUSTOM && !isNil(piece.platformId)) {
return registryPieceManager(log).getCustomPiecesPath(piece.platformId)
}
return GLOBAL_CACHE_COMMON_PATH
}
default:
throw new Error('Invalid package type')
}
})
}
const savePackageArchivesToDiskIfNotCached = async (
rootWorkspace: string,
pieces: PiecePackage[],
): Promise<void> => {
const saveToDiskJobs = pieces.map(async (piece) => {
if (piece.packageType !== PackageType.ARCHIVE) {
return
}
const archivePath = getPackageArchivePathForPiece(rootWorkspace, piece)
if (await fileSystemUtils.fileExists(archivePath)) {
return
}
await fileSystemUtils.threadSafeMkdir(dirname(archivePath))
const archive = await workerApiService().getPieceArchive(piece.archiveId)
await writeFile(archivePath, archive as Buffer)
})
await Promise.all(saveToDiskJobs)
}
async function createRootPackageJson({ path }: { path: string }): Promise<void> {
const packageJsonPath = join(path, 'package.json')
await fileSystemUtils.threadSafeMkdir(dirname(packageJsonPath))
await writeFileAtomic(packageJsonPath, JSON.stringify({
'name': 'fast-workspace',
'version': '1.0.0',
'workspaces': [
'pieces/**',
],
}, null, 2), 'utf8')
}
async function createPiecePackageJson({ rootWorkspace, piecePackage }: {
rootWorkspace: string
piecePackage: PiecePackage
}): Promise<void> {
const packageJsonPath = join(piecePath(rootWorkspace, piecePackage), 'package.json')
const packageJson = {
'name': `${piecePackage.pieceName}-${piecePackage.pieceVersion}`,
'version': `${piecePackage.pieceVersion}`,
'dependencies': {
[piecePackage.pieceName]: piecePackage.packageType === PackageType.REGISTRY ? piecePackage.pieceVersion : getPackageArchivePathForPiece(rootWorkspace, piecePackage),
},
}
await fileSystemUtils.threadSafeMkdir(dirname(packageJsonPath))
await writeFile(packageJsonPath, JSON.stringify(packageJson, null, 2), 'utf8')
}
async function partitionPiecesToInstallAndToPersist(rootWorkspace: string, pieces: PiecePackage[]): Promise<PieceInstallationResult> {
const piecesWithCheck = await Promise.all(
pieces.map(async (piece) => {
const check = await pieceCheckIfAlreadyInstalled(rootWorkspace, piece)
return { piece, check }
}),
)
const piecesToInstall = piecesWithCheck.filter(({ check }) => !check.installed).map(({ piece }) => piece)
const piecesToPersistOnRedis = piecesWithCheck.filter(({ check }) => check.installed && check.source === 'disk').map(({ piece }) => piece)
return {
piecesToInstall,
piecesToPersistOnRedis,
}
}
async function pieceCheckIfAlreadyInstalled(rootWorkspace: string, piece: PiecePackage): Promise<PieceCheckIfAlreadyInstalledResult> {
const pieceFolder = piecePath(rootWorkspace, piece)
if (usedPiecesMemoryCache[pieceFolder]) {
return {
installed: true,
source: 'memory',
}
}
usedPiecesMemoryCache[pieceFolder] = await fileSystemUtils.fileExists(join(pieceFolder, 'ready'))
return {
installed: usedPiecesMemoryCache[pieceFolder],
source: 'disk',
}
}
async function markPiecesAsUsed(rootWorkspace: string, pieces: PiecePackage[]): Promise<void> {
const writeToDiskJobs = pieces.map(async (piece) => {
await writeFileAtomic(
join(piecePath(rootWorkspace, piece), 'ready'),
'true',
)
})
await Promise.all(writeToDiskJobs)
}
async function persistPiecesOnRedis(pieces: PiecePackage[]): Promise<void> {
if (isEmpty(pieces)) return
const redis = await workerRedisConnections.useExisting()
await redis.mset(pieces.map(piece => [redisUsedPiecesCacheKey(piece), JSON.stringify(piece)]).flat())
}
function getPackageArchivePathForPiece(rootWorkspace: string, piecePackage: PrivatePiecePackage): string {
return join(piecePath(rootWorkspace, piecePackage), `${piecePackage.archiveId}.tgz`)
}
type InstallParams = {
pieces: PiecePackage[]
includeFilters: boolean
broadcast: boolean
}
type PieceCheckIfAlreadyInstalledResult = {
installed: boolean
source: 'memory' | 'disk'
}
type PieceInstallationResult = {
piecesToInstall: PiecePackage[]
piecesToPersistOnRedis: PiecePackage[]
}

View File

@@ -0,0 +1,38 @@
import { readdir, rm } from 'fs/promises'
import path from 'path'
import { exceptionHandler } from '@activepieces/server-shared'
import { FastifyBaseLogger } from 'fastify'
export const LATEST_CACHE_VERSION = 'v7'
export const GLOBAL_CACHE_ALL_VERSIONS_PATH = path.resolve('cache')
export const GLOBAL_CACHE_PATH_LATEST_VERSION = path.resolve('cache', LATEST_CACHE_VERSION)
export const GLOBAL_CACHE_COMMON_PATH = path.resolve(GLOBAL_CACHE_PATH_LATEST_VERSION, 'common')
export const GLOBAL_CODE_CACHE_PATH = path.resolve(GLOBAL_CACHE_PATH_LATEST_VERSION, 'codes')
export const GLOBAL_CACHE_PIECES_PATH = path.resolve(GLOBAL_CACHE_PATH_LATEST_VERSION, 'pieces-metadata')
export const GLOBAL_CACHE_FLOWS_PATH = path.resolve(GLOBAL_CACHE_PATH_LATEST_VERSION, 'flows')
export const ENGINE_PATH = path.join(GLOBAL_CACHE_COMMON_PATH, 'main.js')
export enum CacheState {
READY = 'READY',
PENDING = 'PENDING',
}
export const workerCache = (log: FastifyBaseLogger) => ({
async deleteStaleCache(): Promise<void> {
try {
const cacheDir = path.resolve(GLOBAL_CACHE_ALL_VERSIONS_PATH)
const entries = await readdir(cacheDir, { withFileTypes: true })
for (const entry of entries) {
if (entry.isDirectory() && entry.name !== LATEST_CACHE_VERSION) {
await rm(path.join(cacheDir, entry.name), { recursive: true })
}
}
}
catch (error) {
exceptionHandler.handle(error, log)
}
},
})

View File

@@ -0,0 +1,186 @@
import { assertNotNullOrUndefined, EngineResponse, EngineSocketEvent, EngineStderr, EngineStdout, isNil, SendFlowResponseRequest, UpdateRunProgressRequest, UpdateStepProgressRequest } from '@activepieces/shared'
import { FastifyBaseLogger } from 'fastify'
import { type Socket, Server as SocketIOServer } from 'socket.io'
let io: SocketIOServer | null = null
const sockets: Record<string, Socket> = {}
const resolvePromises: Record<string, (value: boolean) => void> = {}
export const engineRunnerSocket = (log: FastifyBaseLogger) => {
return {
async init(): Promise<void> {
try {
io = new SocketIOServer({
path: '/worker/ws',
maxHttpBufferSize: 1e8,
})
io.listen(12345)
io.on('connection', (socket: Socket) => {
const workerId = socket.handshake.auth['workerId'] as string
log.info('Client connected to engine socket server ' + workerId)
// Clean up any existing socket for this workerId
if (sockets[workerId]) {
this.cleanupSocket(workerId)
}
sockets[workerId] = socket
if (!isNil(resolvePromises[workerId])) {
resolvePromises[workerId](true)
// eslint-disable-next-line @typescript-eslint/no-dynamic-delete
delete resolvePromises[workerId]
}
socket.on('disconnect', () => {
log.info({ workerId }, 'Client disconnected from engine socket server')
this.cleanupSocket(workerId)
})
socket.on('error', (error) => {
log.error({ error, workerId }, 'Socket error occurred')
this.cleanupSocket(workerId)
})
})
process.on('SIGTERM', () => {
void this.disconnect()
})
process.on('SIGINT', () => {
void this.disconnect()
})
}
catch (error) {
log.error({ error }, 'Failed to initialize socket server')
throw error
}
},
isConnected(workerId: string): boolean {
const socket = sockets[workerId]
return !isNil(socket) && socket.connected
},
cleanupSocket(workerId: string): void {
const socket = sockets[workerId]
if (socket) {
socket.removeAllListeners()
socket.disconnect()
// eslint-disable-next-line @typescript-eslint/no-dynamic-delete
delete sockets[workerId]
// eslint-disable-next-line @typescript-eslint/no-dynamic-delete
delete resolvePromises[workerId]
}
},
async waitForConnect(workerId: string): Promise<boolean> {
if (!isNil(sockets[workerId])) {
return sockets[workerId].connected
}
const promise = new Promise<boolean>((resolve) => {
const timeout = setTimeout(() => {
// eslint-disable-next-line @typescript-eslint/no-dynamic-delete
delete resolvePromises[workerId]
resolve(false)
}, 30000)
resolvePromises[workerId] = (value: boolean): void => {
// eslint-disable-next-line @typescript-eslint/no-dynamic-delete
delete resolvePromises[workerId]
clearTimeout(timeout)
resolve(value)
}
})
return promise
},
send(workerId: string, message: unknown): void {
const socket = sockets[workerId]
assertNotNullOrUndefined(socket, 'socket')
if (!socket.connected) {
throw new Error(`Socket for worker ${workerId} is not connected`)
}
socket.emit(EngineSocketEvent.ENGINE_OPERATION, message)
},
subscribe({
workerId,
onResult,
onStdout,
onStderr,
updateRunProgress,
updateStepProgress,
sendFlowResponse,
}: {
workerId: string
onResult: (result: EngineResponse<unknown>) => void
onStdout: (stdout: EngineStdout) => void
onStderr: (stderr: EngineStderr) => void
updateRunProgress: (data: UpdateRunProgressRequest, log: FastifyBaseLogger) => Promise<void>
updateStepProgress: (data: UpdateStepProgressRequest, log: FastifyBaseLogger) => Promise<void>
sendFlowResponse: (data: SendFlowResponseRequest, log: FastifyBaseLogger) => Promise<void>
}): void {
const socket = sockets[workerId]
assertNotNullOrUndefined(socket, 'sockets[workerId]')
// Remove any existing listeners before adding new ones
this.unsubscribe(workerId)
socket.on(EngineSocketEvent.ENGINE_RESPONSE, (data: EngineResponse<unknown>, callback: () => void) => {
onResult(data)
callback?.()
})
socket.on(EngineSocketEvent.ENGINE_STDOUT, (data: EngineStdout) => {
onStdout(data)
})
socket.on(EngineSocketEvent.ENGINE_STDERR, (data: EngineStderr, callback: () => void) => {
onStderr(data)
callback?.()
})
socket.on(EngineSocketEvent.UPDATE_RUN_PROGRESS, async (data: UpdateRunProgressRequest, callback: () => void) => {
await updateRunProgress(data, log)
callback()
})
socket.on(EngineSocketEvent.UPDATE_STEP_PROGRESS, async (data: UpdateStepProgressRequest, callback: () => void) => {
await updateStepProgress(data, log)
callback()
})
socket.on(EngineSocketEvent.SEND_FLOW_RESPONSE, async (data: SendFlowResponseRequest, callback: () => void) => {
await sendFlowResponse(data, log)
callback()
})
},
unsubscribe(workerId: string): void {
const socket = sockets[workerId]
if (socket) {
socket.removeAllListeners(EngineSocketEvent.ENGINE_RESPONSE)
socket.removeAllListeners(EngineSocketEvent.ENGINE_STDOUT)
socket.removeAllListeners(EngineSocketEvent.ENGINE_STDERR)
socket.removeAllListeners(EngineSocketEvent.UPDATE_RUN_PROGRESS)
socket.removeAllListeners(EngineSocketEvent.UPDATE_STEP_PROGRESS)
socket.removeAllListeners(EngineSocketEvent.SEND_FLOW_RESPONSE)
}
},
async disconnect(): Promise<void> {
if (io) {
// Clean up all sockets
Object.keys(sockets).forEach(workerId => {
this.cleanupSocket(workerId)
})
await io.close()
io = null
}
},
}
}

View File

@@ -0,0 +1,65 @@
import {
DropdownState,
DynamicPropsValue,
PieceMetadata,
PropertyType,
} from '@activepieces/pieces-framework'
import { EngineResponseStatus, ExecuteActionResponse, ExecuteToolResponse, ExecuteTriggerResponse, ExecuteValidateAuthResponse, FlowVersionState, SourceCode, TriggerHookType } from '@activepieces/shared'
export type CodeArtifact = {
name: string
sourceCode: SourceCode
flowVersionId: string
flowVersionState: FlowVersionState
}
export type EngineHelperFlowResult = Record<string, never>
export type EngineHelperTriggerResult<
T extends TriggerHookType = TriggerHookType,
> = ExecuteTriggerResponse<T>
export type EngineHelperPropResult = {
type: PropertyType.DROPDOWN
options: DropdownState<unknown>
} | {
type: PropertyType.DYNAMIC
options: Record<string, DynamicPropsValue>
}
export type EngineHelperActionResult = ExecuteActionResponse
export type EngineHelperToolResult = ExecuteToolResponse
export type EngineHelperValidateAuthResult = ExecuteValidateAuthResponse
export type EngineHelperCodeResult = ExecuteActionResponse
export type EngineHelperExtractPieceInformation = PieceMetadata
export type EngineHelperResult =
| EngineHelperFlowResult
| EngineHelperTriggerResult
| EngineHelperPropResult
| EngineHelperToolResult
| EngineHelperCodeResult
| EngineHelperExtractPieceInformation
| EngineHelperActionResult
| EngineHelperValidateAuthResult
export type EngineHelperResponse<Result extends EngineHelperResult> = {
status: EngineResponseStatus
result: Result
standardError: string
standardOutput: string
delayInSeconds?: number
}
export type ExecuteSandboxResult = {
output: unknown
timeInSeconds: number
verdict: EngineResponseStatus
standardOutput: string
standardError: string
}

View File

@@ -0,0 +1,262 @@
import { webhookSecretsUtils } from '@activepieces/server-shared'
import { ActivepiecesError, AgentPieceProps, AgentToolType, AI_PIECE_NAME, BeginExecuteFlowOperation, CodeAction, EngineOperation, EngineOperationType, EngineResponseStatus, ErrorCode, ExecuteExtractPieceMetadataOperation, ExecuteFlowOperation, ExecutePropsOptions, ExecuteTriggerOperation, ExecuteValidateAuthOperation, FlowActionType, flowStructureUtil, FlowTriggerType, FlowVersion, PieceActionSettings, PieceTriggerSettings, ResumeExecuteFlowOperation, TriggerHookType } from '@activepieces/shared'
import { trace } from '@opentelemetry/api'
import chalk from 'chalk'
import { FastifyBaseLogger } from 'fastify'
import { executionFiles } from '../cache/execution-files'
import { pieceWorkerCache } from '../cache/piece-worker-cache'
import { workerMachine } from '../utils/machine'
import { webhookUtils } from '../utils/webhook-utils'
import { CodeArtifact, EngineHelperExtractPieceInformation, EngineHelperFlowResult, EngineHelperPropResult, EngineHelperResponse, EngineHelperResult, EngineHelperTriggerResult, EngineHelperValidateAuthResult } from './engine-runner-types'
import { engineProcessManager } from './process/engine-process-manager'
const tracer = trace.getTracer('engine-runner')
type EngineConstants = 'publicApiUrl' | 'internalApiUrl' | 'engineToken'
export const engineRunner = (log: FastifyBaseLogger) => ({
async executeFlow(engineToken: string, operation: Omit<BeginExecuteFlowOperation, EngineConstants> | Omit<ResumeExecuteFlowOperation, EngineConstants>): Promise<EngineHelperResponse<EngineHelperFlowResult>> {
return tracer.startActiveSpan('engineRunner.executeFlow', {
attributes: {
'flow.versionId': operation.flowVersion.id,
'flow.projectId': operation.projectId,
'flow.platformId': operation.platformId,
},
}, async (span) => {
try {
log.debug({
flowVersion: operation.flowVersion.id,
projectId: operation.projectId,
}, '[threadEngineRunner#executeFlow]')
await prepareFlowSandbox(log, engineToken, operation.flowVersion, operation.projectId, operation.platformId)
const input: ExecuteFlowOperation = {
...operation,
engineToken,
publicApiUrl: workerMachine.getPublicApiUrl(),
internalApiUrl: workerMachine.getInternalApiUrl(),
}
return await execute<EngineHelperFlowResult>(log, input, EngineOperationType.EXECUTE_FLOW, operation.timeoutInSeconds)
}
finally {
span.end()
}
})
},
async executeTrigger<T extends TriggerHookType>(engineToken: string, operation: Omit<ExecuteTriggerOperation<T>, EngineConstants>): Promise<EngineHelperResponse<EngineHelperTriggerResult<T>>> {
log.debug({
hookType: operation.hookType,
projectId: operation.projectId,
}, '[threadEngineRunner#executeTrigger]')
const triggerSettings = operation.flowVersion.trigger.settings as PieceTriggerSettings
const triggerPiece = await pieceWorkerCache(log).getPiece({
engineToken,
pieceName: triggerSettings.pieceName,
pieceVersion: triggerSettings.pieceVersion,
platformId: operation.platformId,
})
const input: ExecuteTriggerOperation<TriggerHookType> = {
platformId: operation.platformId,
projectId: operation.projectId,
hookType: operation.hookType,
webhookUrl: operation.webhookUrl,
triggerPayload: operation.triggerPayload,
test: operation.test,
flowVersion: operation.flowVersion,
appWebhookUrl: await webhookUtils(log).getAppWebhookUrl({
appName: triggerPiece.pieceName,
publicApiUrl: workerMachine.getPublicApiUrl(),
}),
publicApiUrl: workerMachine.getPublicApiUrl(),
internalApiUrl: workerMachine.getInternalApiUrl(),
webhookSecret: await webhookSecretsUtils.getWebhookSecret(operation.flowVersion),
engineToken,
timeoutInSeconds: operation.timeoutInSeconds,
}
await executionFiles(log).provision({
pieces: [triggerPiece],
codeSteps: [],
})
return execute(log, input, EngineOperationType.EXECUTE_TRIGGER_HOOK, operation.timeoutInSeconds)
},
async extractPieceMetadata(operation: ExecuteExtractPieceMetadataOperation): Promise<EngineHelperResponse<EngineHelperExtractPieceInformation>> {
log.debug({ operation }, '[threadEngineRunner#extractPieceMetadata]')
await executionFiles(log).provision({
pieces: [operation],
codeSteps: [],
})
return execute(log, operation, EngineOperationType.EXTRACT_PIECE_METADATA, operation.timeoutInSeconds)
},
async executeValidateAuth(engineToken: string, operation: Omit<ExecuteValidateAuthOperation, EngineConstants>): Promise<EngineHelperResponse<EngineHelperValidateAuthResult>> {
log.debug({ ...operation.piece, platformId: operation.platformId }, '[threadEngineRunner#executeValidateAuth]')
await executionFiles(log).provision({
pieces: [operation.piece],
codeSteps: [],
})
const input: ExecuteValidateAuthOperation = {
...operation,
publicApiUrl: workerMachine.getPublicApiUrl(),
internalApiUrl: workerMachine.getInternalApiUrl(),
engineToken,
}
return execute(log, input, EngineOperationType.EXECUTE_VALIDATE_AUTH, operation.timeoutInSeconds)
},
async executeProp(engineToken: string, operation: Omit<ExecutePropsOptions, EngineConstants>): Promise<EngineHelperResponse<EngineHelperPropResult>> {
log.debug({
piece: operation.piece,
propertyName: operation.propertyName,
stepName: operation.actionOrTriggerName,
}, '[threadEngineRunner#executeProp]')
await executionFiles(log).provision({
pieces: [operation.piece],
codeSteps: [],
})
const input: ExecutePropsOptions = {
...operation,
publicApiUrl: workerMachine.getPublicApiUrl(),
internalApiUrl: workerMachine.getInternalApiUrl(),
engineToken,
}
return execute(log, input, EngineOperationType.EXECUTE_PROPERTY, operation.timeoutInSeconds)
},
async shutdownAllWorkers(): Promise<void> {
await engineProcessManager.shutdown()
},
})
async function prepareFlowSandbox(log: FastifyBaseLogger, engineToken: string, flowVersion: FlowVersion, projectId: string, platformId: string): Promise<void> {
return tracer.startActiveSpan('prepareFlowSandbox', {
attributes: {
'sandbox.flowVersionId': flowVersion.id,
'sandbox.projectId': projectId,
'sandbox.platformId': platformId,
},
}, async (span) => {
try {
const steps = flowStructureUtil.getAllSteps(flowVersion.trigger)
const pieceSteps = steps.filter((step) => step.type === FlowTriggerType.PIECE || step.type === FlowActionType.PIECE)
span.setAttribute('sandbox.pieceStepsCount', pieceSteps.length)
const flowPieces = pieceSteps.map((step) => {
const { pieceName, pieceVersion } = step.settings as PieceTriggerSettings | PieceActionSettings
const pieces = [ pieceWorkerCache(log).getPiece({
engineToken,
pieceName,
pieceVersion,
platformId,
})]
if (pieceName === AI_PIECE_NAME) {
const agentTools = step.settings.input?.[AgentPieceProps.AGENT_TOOLS]
for (const tool of agentTools ?? []) {
if (tool.type === AgentToolType.PIECE) {
pieces.push(pieceWorkerCache(log).getPiece({
engineToken,
platformId: tool.platformId,
pieceName: tool.pieceMetadata.pieceName,
pieceVersion: tool.pieceMetadata.pieceVersion,
}),
)
}
}
}
return pieces
})
const codeSteps = getCodePieces(flowVersion)
span.setAttribute('sandbox.codeStepsCount', codeSteps.length)
await executionFiles(log).provision({
pieces: await Promise.all(flowPieces.flat()),
codeSteps,
})
}
finally {
span.end()
}
})
}
function getCodePieces(flowVersion: FlowVersion): CodeArtifact[] {
const steps = flowStructureUtil.getAllSteps(flowVersion.trigger)
return steps.filter((step) => step.type === FlowActionType.CODE).map((step) => {
const codeAction = step as CodeAction
return {
name: codeAction.name,
flowVersionId: flowVersion.id,
flowVersionState: flowVersion.state,
sourceCode: codeAction.settings.sourceCode,
}
})
}
async function execute<Result extends EngineHelperResult>(log: FastifyBaseLogger, operation: EngineOperation, operationType: EngineOperationType, timeoutInSeconds: number): Promise<EngineHelperResponse<Result>> {
return tracer.startActiveSpan('engineRunner.execute', {
attributes: {
'engine.operationType': operationType,
'engine.timeoutInSeconds': timeoutInSeconds,
},
}, async (span) => {
try {
const { engine, stdError, stdOut } = await engineProcessManager.executeTask(operationType, operation, log, timeoutInSeconds)
log.debug({
stdError: chalk.red(stdError),
stdOut: chalk.green(stdOut),
}, '[engineRunner#execute] error')
span.setAttribute('engine.responseStatus', engine.status)
if (engine.status === EngineResponseStatus.TIMEOUT) {
span.recordException(new Error('Execution timeout'))
throw new ActivepiecesError({
code: ErrorCode.EXECUTION_TIMEOUT,
params: {
standardOutput: stdOut,
standardError: stdError,
},
})
}
if (engine.status === EngineResponseStatus.MEMORY_ISSUE) {
span.recordException(new Error('Memory issue'))
throw new ActivepiecesError({
code: ErrorCode.MEMORY_ISSUE,
params: {
standardOutput: stdOut,
standardError: stdError,
},
})
}
const result = tryParseJson(engine.response)
return {
status: engine.status,
delayInSeconds: engine.delayInSeconds,
result: result as Result,
standardError: stdError,
standardOutput: stdOut,
}
}
finally {
span.end()
}
})
}
function tryParseJson(value: unknown): unknown {
try {
return JSON.parse(value as string)
}
catch (e) {
return value
}
}

View File

@@ -0,0 +1,380 @@
import { ChildProcess } from 'child_process'
import { ApSemaphore } from '@activepieces/server-shared'
import { ApEnvironment, assertNotNullOrUndefined, EngineOperation, EngineOperationType, EngineResponse, EngineResponseStatus, EngineStderr, EngineStdout, ExecuteFlowOperation, ExecutePropsOptions, ExecuteTriggerOperation, ExecutionMode, isNil, TriggerHookType } from '@activepieces/shared'
import { trace } from '@opentelemetry/api'
import { FastifyBaseLogger } from 'fastify'
import { nanoid } from 'nanoid'
import treeKill from 'tree-kill'
import { workerMachine } from '../../utils/machine'
import { engineRunnerSocket } from '../engine-runner-socket'
import { engineSocketHandlers } from './engine-socket-handlers'
import { EngineProcessOptions } from './factory/engine-factory-types'
import { engineProcessFactory } from './factory/index'
const tracer = trace.getTracer('engine-process-manager')
export type WorkerResult = {
engine: EngineResponse<unknown>
stdOut: string
stdError: string
}
let processes: (ChildProcess | undefined)[] = []
let availableProcessIndexes: number[] = []
let processIds: string[] = []
let options: EngineProcessOptions
let lock: ApSemaphore
let engineSocketServer: ReturnType<typeof engineRunnerSocket>
let initialized = false
export const engineProcessManager = {
init(_maxWorkers: number, _options: EngineProcessOptions, log: FastifyBaseLogger) {
if (initialized) {
return
}
options = _options
processes = []
availableProcessIndexes = []
lock = new ApSemaphore(_maxWorkers)
engineSocketServer = engineRunnerSocket(log)
processIds = []
// Create the initial workers
for (let i = 0; i < _maxWorkers; i++) {
processes.push(undefined)
availableProcessIndexes.push(i)
processIds.push(nanoid())
}
initialized = true
},
getFreeSandboxes(): number {
return availableProcessIndexes.length
},
getTotalSandboxes(): number {
return processes.length
},
async executeTask(operationType: EngineOperationType, operation: EngineOperation, log: FastifyBaseLogger, timeoutInSeconds: number): Promise<WorkerResult> {
log.trace({
operationType,
operation,
}, 'Executing operation')
await lock.acquire()
const workerIndex = availableProcessIndexes.pop()
try {
log.debug({
workerIndex,
}, 'Acquired worker')
assertNotNullOrUndefined(workerIndex, 'Worker index should not be undefined')
const workerIsDisconnected = isNil(processes[workerIndex]) || !engineSocketServer.isConnected(processIds[workerIndex])
const workerIsDead = workerIsDisconnected || !isWorkerReusable()
if (workerIsDead) {
await tracer.startActiveSpan('engineProcessManager.provisionWorker', {
attributes: {
'worker.index': workerIndex,
'worker.isDisconnected': workerIsDisconnected,
'worker.isReusable': isWorkerReusable(),
},
}, async (span) => {
try {
log.info({
workerIndex,
workerIsDisconnected,
}, 'Worker is not available, creating a new one')
if (!isNil(processes[workerIndex])) {
await forceTerminate(processes[workerIndex], log)
processIds[workerIndex] = nanoid()
}
const workerId = processIds[workerIndex]
const startTime = performance.now()
await tracer.startActiveSpan('engineProcessManager.createProcess', {
attributes: {
'worker.id': workerId,
'worker.index': workerIndex,
},
}, async (createSpan) => {
try {
processes[workerIndex] = await engineProcessFactory(log).create({
workerId,
workerIndex,
platformId: operation.platformId,
flowVersionId: getFlowVersionId(operation, operationType),
options,
reusable: isWorkerReusable(),
})
const processCreationTime = Math.floor(performance.now() - startTime)
createSpan.setAttribute('worker.processCreationTimeMs', processCreationTime)
}
finally {
createSpan.end()
}
})
const connectionStartTime = performance.now()
await tracer.startActiveSpan('engineProcessManager.waitForConnection', {
attributes: {
'worker.id': workerId,
'worker.index': workerIndex,
},
}, async (connectSpan) => {
try {
const connection = await engineSocketServer.waitForConnect(workerId)
const connectionWaitTime = Math.floor(performance.now() - connectionStartTime)
connectSpan.setAttribute('worker.connectionWaitTimeMs', connectionWaitTime)
if (!connection) {
connectSpan.recordException(new Error('Worker connection failed'))
log.error({
workerIndex,
}, 'Worker connection failed')
throw new Error('Worker connection failed')
}
}
finally {
connectSpan.end()
}
})
const totalTime = Math.floor(performance.now() - startTime)
span.setAttribute('worker.totalProvisioningTimeMs', totalTime)
log.info({
workerIndex,
timeTaken: `${totalTime}ms`,
}, 'Worker connected')
}
finally {
span.end()
}
})
}
const result = await processTask(workerIndex, operationType, operation, log, timeoutInSeconds)
// Keep an await so finally does not run before the task is finished
return result
}
catch (error) {
log.error({
error,
}, 'Error executing task')
throw error
}
finally {
if (!isNil(workerIndex)) {
availableProcessIndexes.push(workerIndex)
}
lock.release()
}
},
async shutdown(): Promise<void> {
if (!initialized) {
return
}
for (const worker of processes) {
worker?.kill()
}
},
}
async function processTask(workerIndex: number, operationType: EngineOperationType, operation: EngineOperation, log: FastifyBaseLogger, timeoutInSeconds: number): Promise<WorkerResult> {
return tracer.startActiveSpan('engineProcessManager.processTask', {
attributes: {
'worker.index': workerIndex,
'worker.operationType': operationType,
'worker.timeoutInSeconds': timeoutInSeconds,
},
}, async (span) => {
const worker = processes[workerIndex]
assertNotNullOrUndefined(worker, 'Worker should not be undefined')
let didTimeout = false
const workerId = processIds[workerIndex]
let timeoutWorker: NodeJS.Timeout | undefined
const taskStartTime = performance.now()
try {
const result = await new Promise<WorkerResult>((resolve, reject) => {
let stdError = ''
let stdOut = ''
// eslint-disable-next-line @typescript-eslint/no-misused-promises
timeoutWorker = setTimeout(async () => {
didTimeout = true
await forceTerminate(worker, log)
processes[workerIndex] = undefined
}, timeoutInSeconds * 1000)
const onResult = (result: EngineResponse<unknown>) => {
const executionTimeMs = Math.floor(performance.now() - taskStartTime)
span.setAttribute('worker.executionTimeMs', executionTimeMs)
span.setAttribute('worker.resultStatus', result.status)
resolve({
engine: result,
stdOut,
stdError,
})
}
const onStdout = (stdout: EngineStdout) => {
stdOut += stdout.message
}
const onStderr = (stderr: EngineStderr) => {
stdError += stderr.message
}
engineSocketServer.subscribe({
workerId,
onResult,
onStdout,
onStderr,
...engineSocketHandlers(log),
})
worker.on('error', (error) => {
log.info({
error,
}, 'Worker returned something in stderr')
span.recordException(error)
reject({ status: EngineResponseStatus.INTERNAL_ERROR, error })
})
worker.on('exit', (code, signal) => {
const isRamIssue = stdError.includes('JavaScript heap out of memory') || stdError.includes('Allocation failed - JavaScript heap out of memory') || (code === 134 || signal === 'SIGABRT' || signal === 'SIGKILL')
log.error({
stdError,
stdOut,
workerIndex,
code,
isRamIssue,
signal,
}, 'Worker exited')
span.setAttribute('worker.exitCode', code ?? -1)
span.setAttribute('worker.exitSignal', signal ?? 'none')
if (didTimeout) {
span.setAttribute('worker.didTimeout', true)
resolve({
engine: {
status: EngineResponseStatus.TIMEOUT,
response: {},
},
stdError,
stdOut,
})
}
else if (isRamIssue) {
span.setAttribute('worker.isRamIssue', true)
resolve({
engine: {
status: EngineResponseStatus.MEMORY_ISSUE,
response: {},
},
stdError,
stdOut,
})
}
else {
span.recordException(new Error(`Worker exited with code ${code} and signal ${signal}`))
reject({
status: EngineResponseStatus.INTERNAL_ERROR,
error: 'Worker exited with code ' + code + ' and signal ' + signal,
stdError,
stdOut,
})
}
})
log.info({
workerIndex,
timeoutInSeconds,
}, 'Sending operation to worker')
engineSocketServer.send(workerId, { operation, operationType })
})
span.end()
return result
}
catch (error) {
log.error({
error,
}, 'Worker throw unexpected error')
span.recordException(error as Error)
span.end()
throw error
}
finally {
engineSocketServer.unsubscribe(workerId)
worker.removeAllListeners('exit')
worker.removeAllListeners('error')
worker.removeAllListeners('message')
if (!isNil(timeoutWorker)) {
clearTimeout(timeoutWorker)
}
if (!isWorkerReusable()) {
if (!isNil(processes[workerIndex])) {
await forceTerminate(processes[workerIndex], log)
}
processes[workerIndex] = undefined
processIds[workerIndex] = nanoid()
}
log.debug({
workerIndex,
}, 'Releasing worker')
}
})
}
function getFlowVersionId(operation: EngineOperation, type: EngineOperationType): string | undefined {
switch (type) {
case EngineOperationType.EXECUTE_FLOW:
return (operation as ExecuteFlowOperation).flowVersion.id
case EngineOperationType.EXECUTE_PROPERTY:
return (operation as ExecutePropsOptions).flowVersion?.id
case EngineOperationType.EXECUTE_TRIGGER_HOOK:
return (operation as ExecuteTriggerOperation<TriggerHookType>).flowVersion.id
case EngineOperationType.EXTRACT_PIECE_METADATA:
case EngineOperationType.EXECUTE_VALIDATE_AUTH:
return undefined
}
}
async function forceTerminate(childProcess: ChildProcess, log: FastifyBaseLogger): Promise<void> {
const pid = childProcess.pid
if (!pid) {
throw new Error('No PID found for child process')
}
await new Promise<void>((resolve) => {
treeKill(pid, 'SIGKILL', (err) => {
if (err) {
log.error({
pid,
error: err,
}, 'Failed to kill child process tree')
}
else {
log.info({
pid,
}, 'Killed child process tree')
}
resolve()
})
})
}
function isWorkerReusable(): boolean {
const settings = workerMachine.getSettings()
const isDev = settings.ENVIRONMENT === ApEnvironment.DEVELOPMENT
if (isDev) {
return false
}
const isDedicated = workerMachine.isDedicatedWorker()
if (isDedicated) {
return true
}
const trustedEnvironment = [ExecutionMode.SANDBOX_CODE_ONLY, ExecutionMode.UNSANDBOXED].includes(settings.EXECUTION_MODE as ExecutionMode)
return trustedEnvironment
}

View File

@@ -0,0 +1,130 @@
import { pubsubFactory } from '@activepieces/server-shared'
import { EngineHttpResponse, FlowRunStatus, isFlowRunStateTerminal, isNil, SendFlowResponseRequest, StepRunResponse, UpdateRunProgressRequest, WebsocketServerEvent } from '@activepieces/shared'
import { FastifyBaseLogger } from 'fastify'
import { StatusCodes } from 'http-status-codes'
import { appSocket } from '../../app-socket'
import { runsMetadataQueue } from '../../flow-worker'
import { workerRedisConnections } from '../../utils/worker-redis'
const pubsub = pubsubFactory(workerRedisConnections.create)
export const engineSocketHandlers = (log: FastifyBaseLogger) => ({
sendFlowResponse: async (request: SendFlowResponseRequest): Promise<void> => {
const { workerHandlerId, httpRequestId, runResponse } = request
await publishEngineResponse(log, {
requestId: httpRequestId,
workerServerId: workerHandlerId,
response: runResponse,
})
},
sendUserInteractionResponse: async <T>(request: PublishEngineResponseRequest<T>): Promise<void> => {
const { requestId, workerServerId, response } = request
await publishEngineResponse(log, {
requestId,
workerServerId,
response,
})
},
updateRunProgress: async (request: UpdateRunProgressRequest): Promise<void> => {
const { runId, projectId, workerHandlerId, status, tags, httpRequestId, stepNameToTest, logsFileId, failedStep, startTime, finishTime, stepResponse, pauseMetadata, stepsCount } = request
const nonSupportedStatuses = [FlowRunStatus.RUNNING, FlowRunStatus.SUCCEEDED, FlowRunStatus.PAUSED]
if (!nonSupportedStatuses.includes(status) && !isNil(workerHandlerId) && !isNil(httpRequestId)) {
await publishEngineResponse(log, {
requestId: httpRequestId,
workerServerId: workerHandlerId,
response: await getFlowResponse(status),
})
}
await runsMetadataQueue.add({
id: runId,
status,
failedStep,
startTime,
finishTime,
logsFileId,
projectId,
tags,
pauseMetadata,
stepsCount,
})
if (!isNil(stepNameToTest) && !isNil(stepResponse)) {
const isTerminalOutput = isFlowRunStateTerminal({
status,
ignoreInternalError: false,
})
const wsEvent = isTerminalOutput ? WebsocketServerEvent.EMIT_TEST_STEP_FINISHED : WebsocketServerEvent.EMIT_TEST_STEP_PROGRESS
await appSocket(log).emitWithAck(wsEvent, { projectId, ...stepResponse })
}
},
updateStepProgress: async (request: UpdateStepProgressRequest): Promise<void> => {
const { projectId, stepResponse } = request
await appSocket(log).emitWithAck(WebsocketServerEvent.EMIT_TEST_STEP_PROGRESS, { projectId, ...stepResponse })
},
})
type UpdateStepProgressRequest = {
projectId: string
stepResponse: StepRunResponse
}
async function publishEngineResponse<T>(log: FastifyBaseLogger, request: PublishEngineResponseRequest<T>): Promise<void> {
const { requestId, workerServerId, response } = request
log.info({ requestId }, '[engineResponsePublisher#publishEngineResponse]')
const message: EngineResponseWithId<T> = { requestId, response }
await pubsub.publish(`engine-run:sync:${workerServerId}`, JSON.stringify(message))
}
async function getFlowResponse(status: FlowRunStatus): Promise<EngineHttpResponse> {
switch (status) {
case FlowRunStatus.INTERNAL_ERROR:
return {
status: StatusCodes.INTERNAL_SERVER_ERROR,
body: {
message: 'An internal error has occurred',
},
headers: {},
}
case FlowRunStatus.FAILED:
case FlowRunStatus.MEMORY_LIMIT_EXCEEDED:
return {
status: StatusCodes.INTERNAL_SERVER_ERROR,
body: {
message: 'The flow has failed and there is no response returned',
},
headers: {},
}
case FlowRunStatus.TIMEOUT:
return {
status: StatusCodes.GATEWAY_TIMEOUT,
body: {
message: 'The request took too long to reply',
},
headers: {},
}
case FlowRunStatus.QUOTA_EXCEEDED:
return {
status: StatusCodes.NO_CONTENT,
body: {},
headers: {},
}
// Case that should be handled before
default:
throw new Error(`Unexpected flow run status: ${status}`)
}
}
type PublishEngineResponseRequest<T> = {
requestId: string
workerServerId: string
response: T
}
type EngineResponseWithId<T> = { requestId: string, response: T }

View File

@@ -0,0 +1,25 @@
import { ChildProcess } from 'child_process'
export type EngineProcess = {
create: (params: CreateEngineParams) => Promise<ChildProcess>
}
export type EngineProcessOptions = {
env: Record<string, string | undefined>
resourceLimits: {
maxOldGenerationSizeMb: number
maxYoungGenerationSizeMb: number
stackSizeMb: number
}
execArgv: string[]
}
type CreateEngineParams = {
workerId: string
workerIndex: number
flowVersionId: string | undefined
platformId: string
options: EngineProcessOptions
reusable: boolean
}

View File

@@ -0,0 +1,19 @@
import { ExecutionMode } from '@activepieces/shared'
import { FastifyBaseLogger } from 'fastify'
import { workerMachine } from '../../../utils/machine'
import { EngineProcess } from './engine-factory-types'
import { isolateSandboxProcess } from './isolate-sandbox-process'
import { noSandboxProcess } from './no-sandbox-process'
const factory = {
[ExecutionMode.UNSANDBOXED]: noSandboxProcess,
[ExecutionMode.SANDBOX_PROCESS]: isolateSandboxProcess,
[ExecutionMode.SANDBOX_CODE_ONLY]: noSandboxProcess,
[ExecutionMode.SANDBOX_CODE_AND_PROCESS]: isolateSandboxProcess,
}
export const engineProcessFactory = (log: FastifyBaseLogger): EngineProcess => {
const executionMode = workerMachine.getSettings().EXECUTION_MODE as ExecutionMode
return factory[executionMode](log)
}

View File

@@ -0,0 +1,115 @@
import { spawn } from 'node:child_process'
import path from 'node:path'
import { arch } from 'node:process'
import { execPromise, fileSystemUtils } from '@activepieces/server-shared'
import { isNil } from '@activepieces/shared'
import { FastifyBaseLogger } from 'fastify'
import { registryPieceManager } from '../../../cache/pieces/production/registry-piece-manager'
import { GLOBAL_CACHE_COMMON_PATH, GLOBAL_CODE_CACHE_PATH } from '../../../cache/worker-cache'
import { workerMachine } from '../../../utils/machine'
import { EngineProcess } from './engine-factory-types'
const getIsolateExecutableName = (): string => {
const defaultName = 'isolate'
const executableNameMap: Partial<Record<typeof arch, string>> = {
arm: 'isolate-arm',
arm64: 'isolate-arm',
}
return executableNameMap[arch] ?? defaultName
}
const currentDir = process.cwd()
const nodeExecutablePath = process.execPath
const isolateBinaryPath = path.resolve(currentDir, 'packages/server/api/src/assets', getIsolateExecutableName())
export const isolateSandboxProcess = (log: FastifyBaseLogger): EngineProcess => ({
create: async (params) => {
const { workerId, workerIndex, options } = params
await execPromise(`${isolateBinaryPath} --box-id=${workerIndex} --cleanup`)
await execPromise(`${isolateBinaryPath} --box-id=${workerIndex} --init`)
const propagatedEnvVars = getEnvironmentVariables(options.env, workerId)
const dirsToBindArgs: string[] = await getDirsToBindArgs(log, params.flowVersionId, params.platformId, params.reusable)
const args = [
...dirsToBindArgs,
'--share-net',
`--box-id=${workerIndex}`,
'--processes',
'--chdir=/root',
'--run',
...propagatedEnvVars,
nodeExecutablePath,
'/root/main.js',
]
log.debug({ command: `${isolateBinaryPath} ${args.join(' ')}` }, '[IsolateSandboxProcess#create] Executing command')
const isolateProcess = spawn(isolateBinaryPath, args, {
shell: true,
})
isolateProcess.stdout?.on('data', (data) => {
process.stdout.write(data)
})
isolateProcess.stderr?.on('data', (data) => {
process.stderr.write(data)
})
return isolateProcess
},
})
function getEnvironmentVariables(env: Record<string, string | undefined>, workerId: string): string[] {
return Object.entries({
...env,
AP_BASE_CODE_DIRECTORY: '/codes',
HOME: '/tmp/',
WORKER_ID: workerId,
}).map(([key, value]) => `--env=${key}='${value}'`)
}
async function getDirsToBindArgs(log: FastifyBaseLogger, flowVersionId: string | undefined, platformId: string, reusable: boolean): Promise<string[]> {
const etcDir = path.resolve('./packages/server/api/src/assets/etc/')
const dirsToBind = [
'--dir=/usr/bin/',
`--dir=/etc/=${etcDir}`,
`--dir=/root=${path.resolve(GLOBAL_CACHE_COMMON_PATH)}`,
]
const codePieceDirectoryToBind = await getCodePieceDirectoryToBind(flowVersionId, reusable)
if (!isNil(codePieceDirectoryToBind)) {
dirsToBind.push(codePieceDirectoryToBind)
}
const customPiecesPath = registryPieceManager(log).getCustomPiecesPath(platformId)
if (customPiecesPath) {
dirsToBind.push(`--dir=/node_modules=${path.resolve(customPiecesPath, 'node_modules')}:maybe`)
dirsToBind.push(`--dir=/pieces=${path.resolve(customPiecesPath, 'pieces')}:maybe`)
}
const devPieces = workerMachine.getSettings().DEV_PIECES
if (devPieces.length > 0) {
const basePath = path.resolve(__dirname.split('/dist')[0])
dirsToBind.push(
`--dir=${path.join(basePath, 'dist')}=/${path.join(basePath, 'dist')}:maybe`,
`--dir=${path.join(basePath, 'node_modules')}=/${path.join(basePath, 'node_modules')}:maybe`,
)
}
return dirsToBind
}
async function getCodePieceDirectoryToBind(flowVersionId: string | undefined, reusable: boolean): Promise<string | undefined> {
if (reusable) {
return `--dir=/codes=${path.resolve(GLOBAL_CODE_CACHE_PATH)}`
}
const fExists = !isNil(flowVersionId) && await fileSystemUtils.fileExists(path.resolve(GLOBAL_CODE_CACHE_PATH, flowVersionId))
if (fExists) {
return `--dir=${path.join('/codes', flowVersionId)}=${path.resolve(GLOBAL_CODE_CACHE_PATH, flowVersionId)}`
}
return undefined
}

View File

@@ -0,0 +1,23 @@
import { fork } from 'child_process'
import { FastifyBaseLogger } from 'fastify'
import { ENGINE_PATH, GLOBAL_CODE_CACHE_PATH } from '../../../cache/worker-cache'
import { EngineProcess } from './engine-factory-types'
export const noSandboxProcess = (_log: FastifyBaseLogger): EngineProcess => ({
create: async (params) => {
return fork(ENGINE_PATH, [], {
...params.options,
execArgv: [
// IMPORTANT DO NOT REMOVE THIS ARGUMENT: https://github.com/laverdet/isolated-vm/issues/424
'--no-node-snapshot',
`--max-old-space-size=${params.options.resourceLimits.maxOldGenerationSizeMb}`,
`--max-semi-space-size=${params.options.resourceLimits.maxYoungGenerationSizeMb}`,
],
env: {
...params.options.env,
AP_BASE_CODE_DIRECTORY: GLOBAL_CODE_CACHE_PATH,
WORKER_ID: params.workerId,
},
})
},
})

View File

@@ -0,0 +1,51 @@
import { assertNotNullOrUndefined, ConsumeJobResponse, ConsumeJobResponseStatus, PollingJobData, ProgressUpdateType, RunEnvironment, TriggerPayload, TriggerRunStatus } from '@activepieces/shared'
import { FastifyBaseLogger } from 'fastify'
import { workerApiService } from '../../api/server-api.service'
import { flowWorkerCache } from '../../cache/flow-worker-cache'
import { triggerHooks } from '../../utils/trigger-utils'
export const executeTriggerExecutor = (log: FastifyBaseLogger) => ({
async executeTrigger({ jobId, data, engineToken, timeoutInSeconds }: ExecuteTriggerParams): Promise<ConsumeJobResponse> {
const { flowVersionId } = data
const flowVersion = await flowWorkerCache(log).getVersion({
engineToken,
flowVersionId,
})
assertNotNullOrUndefined(flowVersion, 'flowVersion')
const { payloads, status, errorMessage } = await triggerHooks(log).extractPayloads(engineToken, {
projectId: data.projectId,
platformId: data.platformId,
flowVersion,
payload: {} as TriggerPayload,
simulate: false,
jobId,
timeoutInSeconds,
})
if (status === TriggerRunStatus.INTERNAL_ERROR) {
return {
status: ConsumeJobResponseStatus.INTERNAL_ERROR,
errorMessage,
}
}
await workerApiService().startRuns({
flowVersionId: data.flowVersionId,
platformId: data.platformId,
progressUpdateType: ProgressUpdateType.NONE,
projectId: data.projectId,
payloads,
environment: RunEnvironment.PRODUCTION,
})
return {
status: ConsumeJobResponseStatus.OK,
}
},
})
type ExecuteTriggerParams = {
jobId: string
data: PollingJobData
engineToken: string
timeoutInSeconds: number
}

View File

@@ -0,0 +1,261 @@
import { exceptionHandler, pinoLogging } from '@activepieces/server-shared'
import { ActivepiecesError, BeginExecuteFlowOperation, ConsumeJobResponse, ConsumeJobResponseStatus, EngineResponseStatus, ErrorCode, ExecuteFlowJobData, ExecutionType, FlowExecutionState, flowExecutionStateKey, FlowRunStatus, FlowStatus, FlowVersion, isNil, ResumeExecuteFlowOperation, ResumePayload, RunEnvironment } from '@activepieces/shared'
import { trace } from '@opentelemetry/api'
import dayjs from 'dayjs'
import { FastifyBaseLogger } from 'fastify'
import { flowRunLogs } from '../../api/server-api.service'
import { flowWorkerCache } from '../../cache/flow-worker-cache'
import { engineRunner } from '../../compute'
import { engineSocketHandlers } from '../../compute/process/engine-socket-handlers'
import { runsMetadataQueue } from '../../flow-worker'
import { workerRedisConnections } from '../../utils/worker-redis'
const tracer = trace.getTracer('flow-job-executor')
type EngineConstants = 'internalApiUrl' | 'publicApiUrl' | 'engineToken'
async function prepareInput(
flowVersion: FlowVersion,
jobData: ExecuteFlowJobData,
attempsStarted: number,
timeoutInSeconds: number,
): Promise<
| Omit<BeginExecuteFlowOperation, EngineConstants>
| Omit<ResumeExecuteFlowOperation, EngineConstants>
> {
const previousExecutionFile = (jobData.executionType === ExecutionType.RESUME || attempsStarted > 1) ? await flowRunLogs.get(jobData.logsUploadUrl) : null
const steps = !isNil(previousExecutionFile) ? previousExecutionFile?.executionState?.steps : {}
switch (jobData.executionType) {
case ExecutionType.BEGIN: {
return {
platformId: jobData.platformId,
flowVersion,
flowRunId: jobData.runId,
projectId: jobData.projectId,
serverHandlerId: jobData.synchronousHandlerId ?? null,
triggerPayload: jobData.payload,
executionType: ExecutionType.BEGIN,
executionState: {
steps,
},
sampleData: jobData.sampleData,
executeTrigger: jobData.executeTrigger ?? false,
runEnvironment: jobData.environment,
httpRequestId: jobData.httpRequestId ?? null,
progressUpdateType: jobData.progressUpdateType,
stepNameToTest: jobData.stepNameToTest ?? null,
logsUploadUrl: jobData.logsUploadUrl,
logsFileId: jobData.logsFileId,
timeoutInSeconds,
}
}
case ExecutionType.RESUME: {
return {
platformId: jobData.platformId,
flowVersion,
flowRunId: jobData.runId,
projectId: jobData.projectId,
serverHandlerId: jobData.synchronousHandlerId ?? null,
executionType: ExecutionType.RESUME,
executionState: {
steps,
},
runEnvironment: jobData.environment,
httpRequestId: jobData.httpRequestId ?? null,
resumePayload: jobData.payload as ResumePayload,
progressUpdateType: jobData.progressUpdateType,
stepNameToTest: jobData.stepNameToTest ?? null,
logsUploadUrl: jobData.logsUploadUrl,
logsFileId: jobData.logsFileId,
timeoutInSeconds,
}
}
}
}
async function handleMemoryIssueError(
jobData: ExecuteFlowJobData,
log: FastifyBaseLogger,
): Promise<void> {
await engineSocketHandlers(log).updateRunProgress({
finishTime: dayjs().toISOString(),
status: FlowRunStatus.MEMORY_LIMIT_EXCEEDED,
httpRequestId: jobData.httpRequestId,
progressUpdateType: jobData.progressUpdateType,
workerHandlerId: jobData.synchronousHandlerId,
runId: jobData.runId,
projectId: jobData.projectId,
})
}
async function handleTimeoutError(
jobData: ExecuteFlowJobData,
log: FastifyBaseLogger,
): Promise<void> {
await engineSocketHandlers(log).updateRunProgress({
finishTime: dayjs().toISOString(),
status: FlowRunStatus.TIMEOUT,
httpRequestId: jobData.httpRequestId,
progressUpdateType: jobData.progressUpdateType,
workerHandlerId: jobData.synchronousHandlerId,
runId: jobData.runId,
projectId: jobData.projectId,
})
}
async function handleInternalError(
jobData: ExecuteFlowJobData,
log: FastifyBaseLogger,
): Promise<void> {
await engineSocketHandlers(log).updateRunProgress({
finishTime: dayjs().toISOString(),
status: FlowRunStatus.INTERNAL_ERROR,
httpRequestId: jobData.httpRequestId,
progressUpdateType: jobData.progressUpdateType,
workerHandlerId: jobData.synchronousHandlerId,
runId: jobData.runId,
projectId: jobData.projectId,
})
}
export const flowJobExecutor = (log: FastifyBaseLogger) => ({
async executeFlow({
jobData,
attemptsStarted,
engineToken,
timeoutInSeconds,
}: ExecuteFlowOptions): Promise<ConsumeJobResponse> {
return tracer.startActiveSpan('flowJobExecutor.executeFlow', {
attributes: {
'flow.runId': jobData.runId,
'flow.flowVersionId': jobData.flowVersionId,
'flow.projectId': jobData.projectId,
'flow.executionType': jobData.executionType,
},
}, async (span) => {
try {
const shouldSkip = await shouldSkipDisabledFlow(jobData)
if (shouldSkip) {
log.info({
message: '[flowJobExecutor] Skipping flow because it is disabled',
flowId: jobData.flowId,
projectId: jobData.projectId,
})
return {
status: ConsumeJobResponseStatus.OK,
}
}
const flowVersion = await flowWorkerCache(log).getVersion({
engineToken,
flowVersionId: jobData.flowVersionId,
})
if (isNil(flowVersion)) {
return {
status: ConsumeJobResponseStatus.OK,
}
}
await runsMetadataQueue.add({
id: jobData.runId,
projectId: jobData.projectId,
startTime: jobData.executionType === ExecutionType.BEGIN ? dayjs().toISOString() : undefined,
status: FlowRunStatus.RUNNING,
})
const runLog = pinoLogging.createRunContextLog({
log,
runId: jobData.runId,
webhookId: jobData.httpRequestId,
flowId: flowVersion.flowId,
flowVersionId: flowVersion.id,
})
const input = await prepareInput(
flowVersion,
jobData,
attemptsStarted,
timeoutInSeconds,
)
const { result, status, delayInSeconds } = await engineRunner(runLog).executeFlow(
engineToken,
input,
)
if (status === EngineResponseStatus.INTERNAL_ERROR) {
span.recordException(new Error(`Engine internal error: ${JSON.stringify(result)}`))
throw new ActivepiecesError({
code: ErrorCode.ENGINE_OPERATION_FAILURE,
params: {
message: JSON.stringify(result),
},
})
}
if (!isNil(delayInSeconds) && delayInSeconds > 0) {
span.setAttribute('flow.delayInSeconds', delayInSeconds)
return {
status: ConsumeJobResponseStatus.OK,
delayInSeconds,
}
}
return { status: ConsumeJobResponseStatus.OK }
}
catch (e) {
const isTimeoutError =
e instanceof ActivepiecesError &&
e.error.code === ErrorCode.EXECUTION_TIMEOUT
const isMemoryIssueError =
e instanceof ActivepiecesError &&
e.error.code === ErrorCode.MEMORY_ISSUE
if (isTimeoutError) {
span.setAttribute('error.type', 'timeout')
await handleTimeoutError(jobData, log)
return {
status: ConsumeJobResponseStatus.OK,
}
}
else if (isMemoryIssueError) {
span.setAttribute('error.type', 'memory')
await handleMemoryIssueError(jobData, log)
return {
status: ConsumeJobResponseStatus.OK,
}
}
else {
span.recordException(e as Error)
await handleInternalError(jobData, log)
exceptionHandler.handle(e, log)
throw e
}
}
finally {
span.end()
}
})
},
})
async function shouldSkipDisabledFlow(jobData: ExecuteFlowJobData): Promise<boolean> {
if (jobData.environment === RunEnvironment.TESTING) {
return false
}
const redisConnection = await workerRedisConnections.useExisting()
const flowExecutionStateString = await redisConnection.get(flowExecutionStateKey(jobData.flowId))
if (isNil(flowExecutionStateString)) {
return false
}
const flowExecutionState = JSON.parse(flowExecutionStateString) as FlowExecutionState
if (!flowExecutionState.exists || flowExecutionState.flow.status === FlowStatus.DISABLED) {
return true
}
return false
}
type ExecuteFlowOptions = {
jobData: ExecuteFlowJobData
attemptsStarted: number
engineToken: string
timeoutInSeconds: number
}

View File

@@ -0,0 +1,47 @@
import { ConsumeJobResponse, ConsumeJobResponseStatus, isNil, RenewWebhookJobData, TriggerHookType } from '@activepieces/shared'
import { FastifyBaseLogger } from 'fastify'
import { flowWorkerCache } from '../../cache/flow-worker-cache'
import { engineRunner } from '../../compute'
import { workerMachine } from '../../utils/machine'
import { webhookUtils } from '../../utils/webhook-utils'
export const renewWebhookExecutor = (log: FastifyBaseLogger) => ({
async renewWebhook({ data, engineToken, timeoutInSeconds }: RenewWebhookParams): Promise<ConsumeJobResponse> {
const { flowVersionId } = data
const flowVersion = await flowWorkerCache(log).getVersion({
engineToken,
flowVersionId,
})
if (isNil(flowVersion)) {
return {
status: ConsumeJobResponseStatus.OK,
}
}
log.info({ flowVersionId: data.flowVersionId }, '[FlowQueueConsumer#consumeRenewWebhookJob]')
const simulate = false
await engineRunner(log).executeTrigger(engineToken, {
platformId: data.platformId,
hookType: TriggerHookType.RENEW,
flowVersion,
webhookUrl: await webhookUtils(log).getWebhookUrl({
flowId: flowVersion.flowId,
simulate,
publicApiUrl: workerMachine.getPublicApiUrl(),
}),
test: simulate,
projectId: data.projectId,
timeoutInSeconds,
})
return {
status: ConsumeJobResponseStatus.OK,
}
},
})
type RenewWebhookParams = {
data: RenewWebhookJobData
engineToken: string
timeoutInSeconds: number
}

View File

@@ -0,0 +1,72 @@
import { AppConnectionValue, assertNotNullOrUndefined, UserInteractionJobData, WorkerJobType } from '@activepieces/shared'
import { FastifyBaseLogger } from 'fastify'
import { flowWorkerCache } from '../../cache/flow-worker-cache'
import { engineRunner } from '../../compute'
import { EngineHelperResponse, EngineHelperResult } from '../../compute/engine-runner-types'
import { engineSocketHandlers } from '../../compute/process/engine-socket-handlers'
import { workerMachine } from '../../utils/machine'
import { webhookUtils } from '../../utils/webhook-utils'
export const userInteractionJobExecutor = (log: FastifyBaseLogger) => ({
async execute(jobData: UserInteractionJobData, engineToken: string, timeoutInSeconds: number): Promise<void> {
let response: EngineHelperResponse<EngineHelperResult>
switch (jobData.jobType) {
case WorkerJobType.EXECUTE_EXTRACT_PIECE_INFORMATION:
response = await engineRunner(log).extractPieceMetadata({
...jobData.piece,
platformId: jobData.platformId,
timeoutInSeconds,
})
break
case WorkerJobType.EXECUTE_VALIDATION:
response = await engineRunner(log).executeValidateAuth(engineToken, {
platformId: jobData.platformId,
auth: jobData.connectionValue as AppConnectionValue,
piece: jobData.piece,
timeoutInSeconds,
})
break
case WorkerJobType.EXECUTE_TRIGGER_HOOK: {
const flowVersion = await flowWorkerCache(log).getVersion({
engineToken,
flowVersionId: jobData.flowVersionId,
})
assertNotNullOrUndefined(flowVersion, 'flowVersion')
response = await engineRunner(log).executeTrigger(engineToken, {
platformId: jobData.platformId,
hookType: jobData.hookType,
flowVersion,
webhookUrl: await webhookUtils(log).getWebhookUrl({
flowId: flowVersion.flowId,
simulate: jobData.test,
publicApiUrl: workerMachine.getPublicApiUrl(),
}),
triggerPayload: jobData.triggerPayload,
projectId: jobData.projectId,
test: jobData.test,
timeoutInSeconds,
})
break
}
case WorkerJobType.EXECUTE_PROPERTY:
response = await engineRunner(log).executeProp(engineToken, {
platformId: jobData.platformId,
piece: jobData.piece,
flowVersion: jobData.flowVersion,
propertyName: jobData.propertyName,
actionOrTriggerName: jobData.actionOrTriggerName,
input: jobData.input,
sampleData: jobData.sampleData,
projectId: jobData.projectId,
searchValue: jobData.searchValue,
timeoutInSeconds,
})
break
}
await engineSocketHandlers(log).sendUserInteractionResponse({
requestId: jobData.requestId,
workerServerId: jobData.webserverId,
response,
})
},
})

View File

@@ -0,0 +1,144 @@
import { pinoLogging } from '@activepieces/server-shared'
import {
ConsumeJobResponse,
ConsumeJobResponseStatus,
EventPayload,
FlowVersion,
isNil,
PlatformId,
ProgressUpdateType,
TriggerRunStatus,
WebhookJobData,
} from '@activepieces/shared'
import { trace } from '@opentelemetry/api'
import { FastifyBaseLogger } from 'fastify'
import { workerApiService } from '../../api/server-api.service'
import { flowWorkerCache } from '../../cache/flow-worker-cache'
import { triggerHooks } from '../../utils/trigger-utils'
import { webhookUtils } from '../../utils/webhook-utils'
const tracer = trace.getTracer('webhook-executor')
export const webhookExecutor = (log: FastifyBaseLogger) => ({
async consumeWebhook(
jobId: string,
data: WebhookJobData,
engineToken: string,
timeoutInSeconds: number,
): Promise<ConsumeJobResponse> {
return tracer.startActiveSpan('webhook.executor.consume', {
attributes: {
'webhook.jobId': jobId,
'webhook.flowId': data.flowId,
'webhook.requestId': data.requestId,
'webhook.saveSampleData': data.saveSampleData,
'webhook.execute': data.execute,
'webhook.environment': data.runEnvironment,
},
}, async (span) => {
try {
const webhookLogger = pinoLogging.createWebhookContextLog({
log,
webhookId: data.requestId,
flowId: data.flowId,
})
webhookLogger.info('Webhook job executor started')
const { payload, saveSampleData, flowVersionIdToRun, execute } = data
const flowVersion = await flowWorkerCache(log).getVersion({
engineToken,
flowVersionId: flowVersionIdToRun,
})
if (isNil(flowVersion)) {
span.setAttribute('webhook.flowNotFound', true)
return {
status: ConsumeJobResponseStatus.OK,
}
}
span.setAttribute('webhook.projectId', data.projectId)
if (saveSampleData) {
await handleSampleData(jobId, flowVersion, engineToken, data.projectId, data.platformId, webhookLogger, payload, timeoutInSeconds)
}
const onlySaveSampleData = !execute
if (onlySaveSampleData) {
span.setAttribute('webhook.onlySaveSampleData', true)
return {
status: ConsumeJobResponseStatus.OK,
}
}
const { payloads, status, errorMessage } = await triggerHooks(log).extractPayloads(engineToken, {
jobId,
flowVersion,
payload,
platformId: data.platformId,
projectId: data.projectId,
simulate: saveSampleData,
timeoutInSeconds,
})
span.setAttribute('webhook.payloadsCount', payloads.length)
if (status === TriggerRunStatus.INTERNAL_ERROR) {
span.setAttribute('webhook.error', true)
span.setAttribute('webhook.errorMessage', errorMessage ?? 'unknown')
return {
status: ConsumeJobResponseStatus.INTERNAL_ERROR,
errorMessage,
}
}
await workerApiService().startRuns({
flowVersionId: flowVersion.id,
projectId: data.projectId,
environment: data.runEnvironment,
progressUpdateType: ProgressUpdateType.NONE,
httpRequestId: data.requestId,
payloads,
platformId: data.platformId,
parentRunId: data.parentRunId,
failParentOnFailure: data.failParentOnFailure,
})
span.setAttribute('webhook.runsStarted', true)
return {
status: ConsumeJobResponseStatus.OK,
}
}
finally {
span.end()
}
})
},
})
async function handleSampleData(
jobId: string,
latestFlowVersion: FlowVersion,
engineToken: string,
projectId: string,
platformId: PlatformId,
log: FastifyBaseLogger,
payload: EventPayload,
timeoutInSeconds: number,
): Promise<void> {
const { payloads } = await triggerHooks(log).extractPayloads(engineToken, {
jobId,
flowVersion: latestFlowVersion,
payload,
platformId,
projectId,
simulate: true,
timeoutInSeconds,
})
webhookUtils(log).savePayloadsAsSampleData({
flowVersion: latestFlowVersion,
projectId,
payloads,
})
}

View File

@@ -0,0 +1,99 @@
import { assertNotNullOrUndefined, ConsumeJobResponse, ConsumeJobResponseStatus, JobData, WorkerJobType } from '@activepieces/shared'
import { context, propagation, trace } from '@opentelemetry/api'
import { Job } from 'bullmq'
import dayjs from 'dayjs'
import { FastifyBaseLogger } from 'fastify'
import { workerMachine } from '../utils/machine'
import { tokenUtls } from '../utils/token-utils'
import { executeTriggerExecutor } from './executors/execute-trigger-executor'
import { flowJobExecutor } from './executors/flow-job-executor'
import { renewWebhookExecutor } from './executors/renew-webhook-executor'
import { userInteractionJobExecutor } from './executors/user-interaction-job-executor'
import { webhookExecutor } from './executors/webhook-job-executor'
const tracer = trace.getTracer('job-consumer')
export const jobConsmer = (log: FastifyBaseLogger) => ({
async consumeJob(job: Job<JobData>): Promise<ConsumeJobResponse> {
const { id: jobId, data: jobData, attemptsStarted } = job
assertNotNullOrUndefined(jobId, 'jobId')
const timeoutInSeconds = getTimeoutForWorkerJob(jobData.jobType)
const engineToken = await tokenUtls.generateEngineToken({ jobId, projectId: jobData.projectId!, platformId: jobData.platformId })
const traceContext = ('traceContext' in jobData && jobData.traceContext) ? jobData.traceContext : {}
const extractedContext = propagation.extract(context.active(), traceContext)
return context.with(extractedContext, () => {
return tracer.startActiveSpan('worker.consumeJob', {
attributes: {
'worker.jobId': jobId,
'worker.jobType': jobData.jobType,
'worker.attemptsStarted': attemptsStarted,
'worker.projectId': jobData.projectId ?? 'unknown',
},
}, async (span) => {
try {
switch (jobData.jobType) {
case WorkerJobType.EXECUTE_EXTRACT_PIECE_INFORMATION:
case WorkerJobType.EXECUTE_PROPERTY:
case WorkerJobType.EXECUTE_VALIDATION:
case WorkerJobType.EXECUTE_TRIGGER_HOOK:
await userInteractionJobExecutor(log).execute(jobData, engineToken, timeoutInSeconds)
span.setAttribute('worker.completed', true)
return {
status: ConsumeJobResponseStatus.OK,
}
case WorkerJobType.EXECUTE_FLOW: {
const response = await flowJobExecutor(log).executeFlow({ jobData, attemptsStarted, engineToken, timeoutInSeconds })
span.setAttribute('worker.completed', true)
return response
}
case WorkerJobType.EXECUTE_POLLING: {
const response = await executeTriggerExecutor(log).executeTrigger({
jobId,
data: jobData,
engineToken,
timeoutInSeconds,
})
span.setAttribute('worker.completed', true)
return response
}
case WorkerJobType.RENEW_WEBHOOK: {
const response = await renewWebhookExecutor(log).renewWebhook({
data: jobData,
engineToken,
timeoutInSeconds,
})
span.setAttribute('worker.completed', true)
return response
}
case WorkerJobType.EXECUTE_WEBHOOK: {
span.setAttribute('worker.webhookExecution', true)
return await webhookExecutor(log).consumeWebhook(jobId, jobData, engineToken, timeoutInSeconds)
}
}
}
finally {
span.end()
}
})
})
},
})
const getTimeoutForWorkerJob = (jobType: WorkerJobType): number => {
switch (jobType) {
case WorkerJobType.EXECUTE_TRIGGER_HOOK:
case WorkerJobType.RENEW_WEBHOOK:
return dayjs.duration(workerMachine.getSettings().TRIGGER_HOOKS_TIMEOUT_SECONDS, 'seconds').asSeconds()
case WorkerJobType.EXECUTE_WEBHOOK:
case WorkerJobType.EXECUTE_EXTRACT_PIECE_INFORMATION:
case WorkerJobType.EXECUTE_PROPERTY:
case WorkerJobType.EXECUTE_VALIDATION:
case WorkerJobType.EXECUTE_POLLING:
return dayjs.duration(workerMachine.getSettings().TRIGGER_TIMEOUT_SECONDS, 'seconds').asSeconds()
case WorkerJobType.EXECUTE_FLOW:
return dayjs.duration(workerMachine.getSettings().FLOW_TIMEOUT_SECONDS, 'seconds').asSeconds()
}
}

View File

@@ -0,0 +1,132 @@
import { getPlatformQueueName, QueueName } from '@activepieces/server-shared'
import {
assertNotNullOrUndefined,
ConsumeJobResponseStatus,
ExecutionType,
isNil,
JOB_PRIORITY,
JobData,
LATEST_JOB_DATA_SCHEMA_VERSION,
RATE_LIMIT_PRIORITY,
WorkerJobType,
} from '@activepieces/shared'
import { DelayedError, Worker } from 'bullmq'
import { BullMQOtel } from 'bullmq-otel'
import dayjs from 'dayjs'
import { FastifyBaseLogger } from 'fastify'
import { workerApiService } from '../api/server-api.service'
import { workerMachine } from '../utils/machine'
import { workerRedisConnections } from '../utils/worker-redis'
import { jobConsmer } from './job-consmer'
import { workerJobRateLimiter } from './worker-job-rate-limiter'
let worker: Worker<JobData>
export const jobQueueWorker = (log: FastifyBaseLogger) => ({
async start(): Promise<void> {
if (!isNil(worker)) {
return
}
const isOtpEnabled = workerMachine.getSettings().OTEL_ENABLED
const queueName = getWorkerQueueName()
worker = new Worker<JobData>(queueName, async (job, token) => {
try {
const deprecatedJobs = ['DELAYED_FLOW']
if (deprecatedJobs.includes(job.data.jobType)) {
log.info({
jobId: job.id,
jobData: job.data,
}, '[jobQueueWorker] Skipping deprecated job')
return
}
const isOldSchemaVersion = ('schemaVersion' in job.data ? job.data.schemaVersion : 0) !== LATEST_JOB_DATA_SCHEMA_VERSION
if (isOldSchemaVersion) {
const newJobData = await workerApiService().migrateJob({ jobData: job.data }) as JobData
await job.updateData(newJobData)
}
const jobId = job.id
assertNotNullOrUndefined(jobId, 'jobId')
const { shouldRateLimit } = await workerJobRateLimiter(log).shouldBeLimited(jobId, job.data)
if (shouldRateLimit) {
const baseDelay = Math.min(600, 20 * Math.pow(2, job.attemptsStarted))
const randomFactor = 0.6 + Math.random() * 0.4
const delayInSeconds = Math.round(baseDelay * randomFactor)
await job.moveToDelayed(
dayjs().add(delayInSeconds, 'seconds').valueOf(),
token,
)
log.info({
message: '[jobQueueWorker] Job is throttled and will be retried',
jobId,
delayInSeconds,
})
await job.changePriority({
priority: JOB_PRIORITY[RATE_LIMIT_PRIORITY],
})
throw new DelayedError(
'Thie job is rate limited and will be retried',
)
}
const response = await jobConsmer(log).consumeJob(job)
log.info({
message: '[jobQueueWorker] Consumed job',
response,
})
const isInternalError = response.status === ConsumeJobResponseStatus.INTERNAL_ERROR
if (isInternalError) {
throw new Error(response.errorMessage ?? 'Unknown error')
}
const delayInSeconds = response.delayInSeconds
if (!isNil(delayInSeconds) && job.data.jobType === WorkerJobType.EXECUTE_FLOW) {
await job.updateData({
...job.data,
executionType: ExecutionType.RESUME,
})
await job.moveToDelayed(dayjs().add(delayInSeconds, 'seconds').valueOf(), job.token)
throw new DelayedError('Job requested to be delayed')
}
}
finally {
await workerJobRateLimiter(log).onCompleteOrFailedJob(
job.data,
job.id,
)
}
},
{
connection: await workerRedisConnections.create(),
telemetry: isOtpEnabled
? new BullMQOtel(QueueName.WORKER_JOBS)
: undefined,
concurrency: workerMachine.getSettings().WORKER_CONCURRENCY,
autorun: true,
stalledInterval: 30000,
maxStalledCount: 5,
},
)
await worker.waitUntilReady()
log.info({
message: 'Job queue worker started',
})
},
async close(): Promise<void> {
if (isNil(worker)) {
return
}
await worker.close()
},
})
function getWorkerQueueName(): string {
const platformIdForDedicatedWorker = workerMachine.getSettings().PLATFORM_ID_FOR_DEDICATED_WORKER
if (!isNil(platformIdForDedicatedWorker)) {
return getPlatformQueueName(platformIdForDedicatedWorker)
}
return QueueName.WORKER_JOBS
}

View File

@@ -0,0 +1,201 @@
import { apDayjsDuration, getPlatformPlanNameKey, getProjectMaxConcurrentJobsKey } from '@activepieces/server-shared'
import { ApEdition, ExecuteFlowJobData, isNil, JobData, PlanName, PlatformId, ProjectId, RunEnvironment, WorkerJobType } from '@activepieces/shared'
import { FastifyBaseLogger } from 'fastify'
import { workerMachine } from '../utils/machine'
import { workerDistributedStore, workerRedisConnections } from '../utils/worker-redis'
export const RATE_LIMIT_WORKER_JOB_TYPES = [WorkerJobType.EXECUTE_FLOW]
const projectSetKey = (projectId: string): string => `active_jobs_set:${projectId}`
export const workerJobRateLimiter = (_log: FastifyBaseLogger) => ({
async onCompleteOrFailedJob(data: JobData, jobId: string | undefined): Promise<void> {
const projectRateLimiterEnabled = workerMachine.getSettings().PROJECT_RATE_LIMITER_ENABLED
if (!RATE_LIMIT_WORKER_JOB_TYPES.includes(data.jobType) || !projectRateLimiterEnabled || isNil(jobId)) {
return
}
const castedJob = data as ExecuteFlowJobData
if (castedJob.environment === RunEnvironment.TESTING) {
return
}
const setKey = projectSetKey(castedJob.projectId)
const redisConnection = await workerRedisConnections.useExisting()
await redisConnection.eval(`
local setKey = KEYS[1]
local jobId = ARGV[1]
-- Get all members of the set
local members = redis.call('SMEMBERS', setKey)
-- Find and remove the job entry that starts with jobId:
for i = 1, #members do
local member = members[i]
if string.match(member, '^' .. jobId .. ':') then
redis.call('SREM', setKey, member)
end
end
return 1
`,
1,
setKey,
jobId,
)
},
async shouldBeLimited(jobId: string | undefined, data: JobData): Promise<{
shouldRateLimit: boolean
}> {
const projectRateLimiterEnabled = workerMachine.getSettings().PROJECT_RATE_LIMITER_ENABLED
const flowTimeoutInMilliseconds = apDayjsDuration(workerMachine.getSettings().FLOW_TIMEOUT_SECONDS, 'seconds').add(1, 'minute').asMilliseconds()
if (isNil(data.projectId) || !projectRateLimiterEnabled || isNil(jobId) || !RATE_LIMIT_WORKER_JOB_TYPES.includes(data.jobType)) {
return {
shouldRateLimit: false,
}
}
const castedJob = data as ExecuteFlowJobData
if (castedJob.environment === RunEnvironment.TESTING) {
return {
shouldRateLimit: false,
}
}
const maxConcurrentJobsPerProject = await getMaxConcurrentJobsPerProject({
projectId: data.projectId,
platformId: data.platformId,
})
const setKey = projectSetKey(data.projectId)
const currentTime = Date.now()
const jobWithTimestamp = `${jobId}:${currentTime}`
const redisConnection = await workerRedisConnections.useExisting()
const result = await redisConnection.eval(
`
local setKey = KEYS[1]
local currentTime = tonumber(ARGV[1])
local timeoutMs = tonumber(ARGV[2])
local maxJobs = tonumber(ARGV[3])
local newJobEntry = ARGV[4]
-- Get all members of the set
local members = redis.call('SMEMBERS', setKey)
-- Clean up old jobs and check if job already exists
local jobIdToCheck = string.match(newJobEntry, '^([^:]+):')
for i = 1, #members do
local member = members[i]
local timestamp = string.match(member, ':(%d+)$')
local existingJobId = string.match(member, '^([^:]+):')
-- Clean up old jobs
if timestamp and (currentTime - tonumber(timestamp)) > timeoutMs then
redis.call('SREM', setKey, member)
-- Check if the job already exists in the set
elseif existingJobId == jobIdToCheck then
return 0 -- fixed
end
end
-- Check current size after cleanup
local currentSize = redis.call('SCARD', setKey)
if currentSize >= maxJobs then
return 1 -- Should rate limit
end
-- Add new job with timestamp
redis.call('SADD', setKey, newJobEntry)
redis.call('EXPIRE', setKey, math.ceil(timeoutMs / 1000))
return 0 -- Should not rate limit
`,
1,
setKey,
currentTime.toString(),
flowTimeoutInMilliseconds.toString(),
maxConcurrentJobsPerProject.toString(),
jobWithTimestamp,
) as number
return {
shouldRateLimit: result === 1,
}
},
})
const PLAN_CONCURRENT_JOBS_LIMITS: Record<string, number> = {
[PlanName.STANDARD]: 5,
[PlanName.APPSUMO_ACTIVEPIECES_TIER1]: 5,
[PlanName.APPSUMO_ACTIVEPIECES_TIER2]: 5,
[PlanName.APPSUMO_ACTIVEPIECES_TIER3]: 10,
[PlanName.APPSUMO_ACTIVEPIECES_TIER4]: 15,
[PlanName.APPSUMO_ACTIVEPIECES_TIER5]: 20,
[PlanName.APPSUMO_ACTIVEPIECES_TIER6]: 25,
[PlanName.ENTERPRISE]: 30,
}
function concurrentJobsFromProject({ projectId, storedValues }: GetConcurrentJobsFromProjectParams): number | null {
if (isNil(projectId)) {
return null
}
const maxConcurrentJobsKey = getProjectMaxConcurrentJobsKey(projectId)
const storedValue = storedValues[maxConcurrentJobsKey]
if (isNil(storedValue)) {
return null
}
return Number(storedValue)
}
function concurrentJobsFromPlan({ platformId, storedValues }: GetConcurrentJobsFromPlanParams): number | null {
if (workerMachine.getSettings().EDITION !== ApEdition.CLOUD) {
return null
}
const planNameKey = getPlatformPlanNameKey(platformId)
const platformPlanName = storedValues[planNameKey]
if (isNil(platformPlanName)) {
return null
}
return PLAN_CONCURRENT_JOBS_LIMITS[platformPlanName] ?? null
}
async function getMaxConcurrentJobsPerProject({ projectId, platformId }: GetMaximumConcurrentJovsPerProjectParams): Promise<number> {
const keys = [getPlatformPlanNameKey(platformId)]
if (!isNil(projectId)) {
keys.push(getProjectMaxConcurrentJobsKey(projectId))
}
const storedValues = await workerDistributedStore.getAll<string>(keys)
const concurrentJobsFromProjectValue = concurrentJobsFromProject({ projectId, storedValues })
if (!isNil(concurrentJobsFromProjectValue)) {
return concurrentJobsFromProjectValue
}
const concurrentJobsFromPlanValue = concurrentJobsFromPlan({ platformId, storedValues })
if (!isNil(concurrentJobsFromPlanValue)) {
return concurrentJobsFromPlanValue
}
return workerMachine.getSettings().MAX_CONCURRENT_JOBS_PER_PROJECT
}
type GetConcurrentJobsFromProjectParams = {
projectId: ProjectId | undefined
storedValues: Record<string, string | null>
}
type GetConcurrentJobsFromPlanParams = {
platformId: PlatformId
storedValues: Record<string, string | null>
}
type GetMaximumConcurrentJovsPerProjectParams = {
platformId: PlatformId
projectId: ProjectId | undefined
}

View File

@@ -0,0 +1,72 @@
import { rejectedPromiseHandler, RunsMetadataQueueConfig, runsMetadataQueueFactory } from '@activepieces/server-shared'
import { WebsocketServerEvent, WorkerSettingsResponse } from '@activepieces/shared'
import { FastifyBaseLogger } from 'fastify'
import { appSocket } from './app-socket'
import { registryPieceManager } from './cache/pieces/production/registry-piece-manager'
import { workerCache } from './cache/worker-cache'
import { engineRunner } from './compute'
import { engineRunnerSocket } from './compute/engine-runner-socket'
import { jobQueueWorker } from './consume/job-queue-worker'
import { workerMachine } from './utils/machine'
import { workerDistributedLock, workerDistributedStore, workerRedisConnections } from './utils/worker-redis'
export const runsMetadataQueue = runsMetadataQueueFactory({
createRedisConnection: workerRedisConnections.create,
distributedStore: workerDistributedStore,
})
export const flowWorker = (log: FastifyBaseLogger) => ({
async init({ workerToken: token, markAsHealthy }: FlowWorkerInitParams): Promise<void> {
rejectedPromiseHandler(workerCache(log).deleteStaleCache(), log)
await engineRunnerSocket(log).init()
await appSocket(log).init({
workerToken: token,
onConnect: async () => {
const request = await workerMachine.getSystemInfo()
const response = await appSocket(log).emitWithAck<WorkerSettingsResponse>(WebsocketServerEvent.FETCH_WORKER_SETTINGS, request)
await workerMachine.init(response, token, log)
await registryPieceManager(log).warmup()
await jobQueueWorker(log).start()
await initRunsMetadataQueue(log)
await markAsHealthy()
await registryPieceManager(log).distributedWarmup()
},
})
},
async close(): Promise<void> {
await engineRunnerSocket(log).disconnect()
appSocket(log).disconnect()
if (runsMetadataQueue.isInitialized()) {
await runsMetadataQueue.get().close()
}
await workerRedisConnections.destroy()
await workerDistributedLock(log).destroy()
if (workerMachine.hasSettings()) {
await engineRunner(log).shutdownAllWorkers()
}
await jobQueueWorker(log).close()
},
})
async function initRunsMetadataQueue(log: FastifyBaseLogger): Promise<void> {
const settings = workerMachine.getSettings()
const config: RunsMetadataQueueConfig = {
isOtelEnabled: settings.OTEL_ENABLED ?? false,
redisFailedJobRetentionDays: settings.REDIS_FAILED_JOB_RETENTION_DAYS,
redisFailedJobRetentionMaxCount: settings.REDIS_FAILED_JOB_RETENTION_MAX_COUNT,
}
await runsMetadataQueue.init(config)
log.info({
message: 'Initialized runs metadata queue for worker',
}, '[flowWorker#init]')
}
type FlowWorkerInitParams = {
workerToken: string
markAsHealthy: () => Promise<void>
}

View File

@@ -0,0 +1,159 @@
import { apVersionUtil, environmentVariables, exceptionHandler, networkUtils, systemUsage, webhookSecretsUtils, WorkerSystemProp } from '@activepieces/server-shared'
import { apId, assertNotNullOrUndefined, isNil, spreadIfDefined, WorkerMachineHealthcheckRequest, WorkerSettingsResponse } from '@activepieces/shared'
import { FastifyBaseLogger } from 'fastify'
import { engineProcessManager } from '../compute/process/engine-process-manager'
let settings: WorkerSettingsResponse | undefined
let workerToken: string | undefined
const workerId = apId()
export const workerMachine = {
getWorkerId: () => workerId,
getWorkerToken: () => {
assertNotNullOrUndefined(workerToken, 'Worker token is not set')
return workerToken
},
async getSystemInfo(): Promise<WorkerMachineHealthcheckRequest> {
const { totalRamInBytes, ramUsage } = await systemUsage.getContainerMemoryUsage()
const cpuUsage = systemUsage.getCpuUsage()
const ip = (await networkUtils.getPublicIp()).ip
const diskInfo = await systemUsage.getDiskInfo()
const cpuCores = await systemUsage.getCpuCores()
return {
diskInfo,
cpuUsagePercentage: cpuUsage,
ramUsagePercentage: ramUsage,
totalAvailableRamInBytes: totalRamInBytes,
totalCpuCores: cpuCores,
ip,
workerProps: {
...spreadIfDefined('SANDBOX_PROPAGATED_ENV_VARS', settings?.SANDBOX_PROPAGATED_ENV_VARS?.join(',')),
...spreadIfDefined('EXECUTION_MODE', settings?.EXECUTION_MODE),
...spreadIfDefined('FILE_STORAGE_LOCATION', settings?.FILE_STORAGE_LOCATION),
...spreadIfDefined('WORKER_CONCURRENCY', settings?.WORKER_CONCURRENCY?.toString()),
...spreadIfDefined('TRIGGER_TIMEOUT_SECONDS', settings?.TRIGGER_TIMEOUT_SECONDS?.toString()),
...spreadIfDefined('PAUSED_FLOW_TIMEOUT_DAYS', settings?.PAUSED_FLOW_TIMEOUT_DAYS?.toString()),
...spreadIfDefined('FLOW_TIMEOUT_SECONDS', settings?.FLOW_TIMEOUT_SECONDS?.toString()),
...spreadIfDefined('LOG_LEVEL', settings?.LOG_LEVEL),
...spreadIfDefined('LOG_PRETTY', settings?.LOG_PRETTY),
...spreadIfDefined('ENVIRONMENT', settings?.ENVIRONMENT),
...spreadIfDefined('MAX_FILE_SIZE_MB', settings?.MAX_FILE_SIZE_MB?.toString()),
...spreadIfDefined('SANDBOX_MEMORY_LIMIT', settings?.SANDBOX_MEMORY_LIMIT),
...spreadIfDefined('DEV_PIECES', settings?.DEV_PIECES?.join(',')),
...spreadIfDefined('S3_USE_SIGNED_URLS', settings?.S3_USE_SIGNED_URLS),
...spreadIfDefined('PLATFORM_ID_FOR_DEDICATED_WORKER', settings?.PLATFORM_ID_FOR_DEDICATED_WORKER),
version: await apVersionUtil.getCurrentRelease(),
},
workerId,
totalSandboxes: engineProcessManager.getTotalSandboxes(),
freeSandboxes: engineProcessManager.getFreeSandboxes(),
}
},
isDedicatedWorker: () => {
return !isNil(workerMachine.getSettings().PLATFORM_ID_FOR_DEDICATED_WORKER)
},
init: async (_settings: WorkerSettingsResponse, _workerToken: string, log: FastifyBaseLogger) => {
settings = {
..._settings,
...spreadIfDefined('WORKER_CONCURRENCY', environmentVariables.getNumberEnvironment(WorkerSystemProp.WORKER_CONCURRENCY)),
...spreadIfDefined('PLATFORM_ID_FOR_DEDICATED_WORKER', environmentVariables.getEnvironment(WorkerSystemProp.PLATFORM_ID_FOR_DEDICATED_WORKER)),
}
workerToken = _workerToken
const memoryLimit = Math.floor(Number(settings.SANDBOX_MEMORY_LIMIT) / 1024)
await webhookSecretsUtils.init(settings.APP_WEBHOOK_SECRETS)
engineProcessManager.init(settings.WORKER_CONCURRENCY, {
env: getEnvironmentVariables(),
resourceLimits: {
maxOldGenerationSizeMb: memoryLimit,
maxYoungGenerationSizeMb: memoryLimit,
stackSizeMb: memoryLimit,
},
execArgv: [],
}, log)
exceptionHandler.initializeSentry(settings.SENTRY_DSN)
},
hasSettings: () => {
return !isNil(settings)
},
getSettings: () => {
assertNotNullOrUndefined(settings, 'Settings are not set')
return settings
},
getSettingOrThrow: (prop: keyof WorkerSettingsResponse) => {
assertNotNullOrUndefined(settings, 'Settings are not set')
return settings[prop]
},
getInternalApiUrl: (): string => {
if (environmentVariables.hasAppModules()) {
return 'http://127.0.0.1:3000/'
}
const url = environmentVariables.getEnvironmentOrThrow(WorkerSystemProp.FRONTEND_URL)
return appendSlashAndApi(replaceLocalhost(url))
},
getSocketUrlAndPath: (): { url: string, path: string } => {
if (environmentVariables.hasAppModules()) {
return {
url: 'http://127.0.0.1:3000/',
path: '/socket.io',
}
}
const url = environmentVariables.getEnvironmentOrThrow(WorkerSystemProp.FRONTEND_URL)
return {
url: removeTrailingSlash(replaceLocalhost(url)),
path: '/api/socket.io',
}
},
getPublicApiUrl: (): string => {
return appendSlashAndApi(replaceLocalhost(getPublicUrl()))
},
getPlatformIdForDedicatedWorker: (): string | undefined => {
return environmentVariables.getEnvironment(WorkerSystemProp.PLATFORM_ID_FOR_DEDICATED_WORKER)
},
preWarmCacheEnabled: () => {
const enabledVar = environmentVariables.getEnvironment(WorkerSystemProp.PRE_WARM_CACHE)
return isNil(enabledVar) || environmentVariables.getEnvironment(WorkerSystemProp.PRE_WARM_CACHE) === 'true'
},
}
function getPublicUrl(): string {
if (isNil(settings)) {
const url = environmentVariables.getEnvironmentOrThrow(WorkerSystemProp.FRONTEND_URL)
return url
}
return settings.PUBLIC_URL
}
function replaceLocalhost(urlString: string): string {
const url = new URL(urlString)
if (url.hostname === 'localhost') {
url.hostname = '127.0.0.1'
}
return url.toString()
}
function removeTrailingSlash(url: string): string {
return url.replace(/\/$/, '')
}
function appendSlashAndApi(url: string): string {
const slash = url.endsWith('/') ? '' : '/'
return `${url}${slash}api/`
}
function getEnvironmentVariables(): Record<string, string | undefined> {
const allowedEnvVariables = workerMachine.getSettings().SANDBOX_PROPAGATED_ENV_VARS
const propagatedEnvVars = Object.fromEntries(allowedEnvVariables.map((envVar) => [envVar, process.env[envVar]]))
return {
...propagatedEnvVars,
NODE_OPTIONS: '--enable-source-maps',
AP_PAUSED_FLOW_TIMEOUT_DAYS: workerMachine.getSettings().PAUSED_FLOW_TIMEOUT_DAYS.toString(),
AP_EXECUTION_MODE: workerMachine.getSettings().EXECUTION_MODE,
AP_DEV_PIECES: workerMachine.getSettings().DEV_PIECES.join(','),
AP_MAX_FILE_SIZE_MB: workerMachine.getSettings().MAX_FILE_SIZE_MB.toString(),
AP_FILE_STORAGE_LOCATION: workerMachine.getSettings().FILE_STORAGE_LOCATION,
AP_S3_USE_SIGNED_URLS: workerMachine.getSettings().S3_USE_SIGNED_URLS,
}
}

View File

@@ -0,0 +1,41 @@
import { EnginePrincipal, PrincipalType } from '@activepieces/shared'
import jwt from 'jsonwebtoken'
import { workerMachine } from './machine'
const ONE_WEEK = 7 * 24 * 3600
const ISSUER = 'activepieces'
const ALGORITHM = 'HS256'
export const tokenUtls = {
async generateEngineToken({ jobId, projectId, platformId }: GenerateEngineTokenParams): Promise<string> {
const settings = workerMachine.getSettings()
const secret = settings.JWT_SECRET
const enginePrincipal: EnginePrincipal = {
id: jobId,
type: PrincipalType.ENGINE,
projectId,
platform: {
id: platformId,
},
}
const signOptions: jwt.SignOptions = {
algorithm: ALGORITHM,
expiresIn: ONE_WEEK,
issuer: ISSUER,
}
return new Promise<string>((resolve, reject) => {
jwt.sign(enginePrincipal, secret, signOptions, (err, token) => {
if (err || !token) {
return reject(err || new Error('Failed to generate token'))
}
return resolve(token)
})
})
},
}
type GenerateEngineTokenParams = {
jobId: string
projectId: string
platformId: string
}

View File

@@ -0,0 +1,126 @@
import { inspect } from 'util'
import { triggerRunStats } from '@activepieces/server-shared'
import {
ActivepiecesError,
EngineResponseStatus,
ErrorCode,
FlowTriggerType,
FlowVersion,
PieceTriggerSettings,
PlatformId,
ProjectId,
TriggerHookType,
TriggerPayload,
TriggerRunStatus,
} from '@activepieces/shared'
import { FastifyBaseLogger } from 'fastify'
import { pieceWorkerCache } from '../cache/piece-worker-cache'
import { engineRunner } from '../compute'
import { workerMachine } from './machine'
import { webhookUtils } from './webhook-utils'
import { workerRedisConnections } from './worker-redis'
export const triggerHooks = (log: FastifyBaseLogger) => ({
extractPayloads: async (
engineToken: string,
params: ExecuteTrigger,
): Promise<ExtractPayloadsResult> => {
const { flowVersion, platformId } = params
if (flowVersion.trigger.type === FlowTriggerType.EMPTY) {
log.warn({
flowVersionId: flowVersion.id,
}, '[WebhookUtils#extractPayload] empty trigger, skipping')
return {
status: TriggerRunStatus.COMPLETED,
payloads: [],
}
}
const { payloads, status, errorMessage } = await getTriggerPayloadsAndStatus(engineToken, log, params)
const triggerSettings = flowVersion.trigger.settings as PieceTriggerSettings
const triggerPiece = await pieceWorkerCache(log).getPiece({
engineToken,
pieceName: triggerSettings.pieceName,
pieceVersion: triggerSettings.pieceVersion,
platformId,
})
await triggerRunStats(log, await workerRedisConnections.useExisting()).save({
platformId,
pieceName: triggerPiece.pieceName,
status,
})
return {
status,
payloads,
errorMessage,
}
},
})
type ExtractPayloadsResult = {
payloads: unknown[]
status: TriggerRunStatus
errorMessage?: string
}
type ExecuteTrigger = {
jobId: string
flowVersion: FlowVersion
projectId: ProjectId
platformId: PlatformId
simulate: boolean
payload: TriggerPayload
timeoutInSeconds: number
}
async function getTriggerPayloadsAndStatus(
engineToken: string,
log: FastifyBaseLogger,
params: ExecuteTrigger,
): Promise<ExtractPayloadsResult> {
const { payload, flowVersion, projectId, simulate, timeoutInSeconds } = params
try {
const { status, result, standardError } = await engineRunner(log).executeTrigger(engineToken, {
hookType: TriggerHookType.RUN,
flowVersion,
triggerPayload: payload,
platformId: params.platformId,
webhookUrl: await webhookUtils(log).getWebhookUrl({
flowId: flowVersion.flowId,
simulate,
publicApiUrl: workerMachine.getPublicApiUrl(),
}),
projectId,
test: simulate,
timeoutInSeconds,
})
if (status === EngineResponseStatus.OK && result.success) {
return {
payloads: result.output as unknown[],
status: TriggerRunStatus.COMPLETED,
}
}
return {
payloads: [],
status: TriggerRunStatus.FAILED,
errorMessage: result?.message ?? standardError,
}
}
catch (e) {
const isTimeoutError = e instanceof ActivepiecesError && e.error.code === ErrorCode.EXECUTION_TIMEOUT
if (isTimeoutError) {
return {
payloads: [],
status: TriggerRunStatus.TIMED_OUT,
errorMessage: inspect(e),
}
}
return {
payloads: [],
status: TriggerRunStatus.INTERNAL_ERROR,
errorMessage: inspect(e),
}
}
}

View File

@@ -0,0 +1,62 @@
import {
networkUtils,
rejectedPromiseHandler,
} from '@activepieces/server-shared'
import {
FlowId,
FlowVersion,
} from '@activepieces/shared'
import { FastifyBaseLogger } from 'fastify'
import { workerApiService } from '../api/server-api.service'
export const webhookUtils = (log: FastifyBaseLogger) => ({
async getAppWebhookUrl({
appName,
publicApiUrl,
}: {
appName: string
publicApiUrl: string
}): Promise<string | undefined> {
return networkUtils.combineUrl(publicApiUrl, `v1/app-events/${appName}`)
},
async getWebhookUrl({
flowId,
simulate,
publicApiUrl,
}: GetWebhookUrlParams): Promise<string> {
const suffix: WebhookUrlSuffix = simulate ? '/test' : ''
return networkUtils.combineUrl(publicApiUrl, `v1/webhooks/${flowId}${suffix}`)
},
savePayloadsAsSampleData({
flowVersion,
projectId,
payloads,
}: SaveSampleDataParams): void {
rejectedPromiseHandler(
workerApiService().savePayloadsAsSampleData({
flowId: flowVersion.flowId,
projectId,
payloads,
}),
log,
)
},
})
type WebhookUrlSuffix = '' | '/test'
type GetWebhookUrlParams = {
flowId: FlowId
simulate?: boolean
publicApiUrl: string
}
type SaveSampleDataParams = {
flowVersion: FlowVersion
projectId: string
payloads: unknown[]
}

View File

@@ -0,0 +1,43 @@
import {
distributedLockFactory,
distributedStoreFactory,
redisConnectionFactory,
RedisType,
} from '@activepieces/server-shared'
import { workerMachine } from './machine'
export const workerRedisConnections = redisConnectionFactory(() => {
const settings = workerMachine.getSettings()
const {
REDIS_TYPE,
REDIS_SSL_CA_FILE,
REDIS_DB,
REDIS_HOST,
REDIS_PASSWORD,
REDIS_PORT,
REDIS_URL,
REDIS_USER,
REDIS_USE_SSL,
REDIS_SENTINEL_ROLE,
REDIS_SENTINEL_HOSTS,
REDIS_SENTINEL_NAME,
} = settings
return {
REDIS_TYPE: REDIS_TYPE as RedisType,
REDIS_SSL_CA_FILE,
REDIS_DB: REDIS_DB ?? undefined,
REDIS_HOST,
REDIS_PASSWORD,
REDIS_PORT,
REDIS_URL,
REDIS_USER,
REDIS_USE_SSL,
REDIS_SENTINEL_ROLE,
REDIS_SENTINEL_HOSTS,
REDIS_SENTINEL_NAME,
}
})
export const workerDistributedLock = distributedLockFactory(workerRedisConnections.create)
export const workerDistributedStore = distributedStoreFactory(workerRedisConnections.useExisting)

View File

@@ -0,0 +1,14 @@
{
"extends": "../tsconfig.server.json",
"compilerOptions": {
"noImplicitOverride": true,
"noPropertyAccessFromIndexSignature": true
},
"files": [],
"include": [],
"references": [
{
"path": "./tsconfig.lib.json"
}
]
}

View File

@@ -0,0 +1,10 @@
{
"extends": "./tsconfig.json",
"compilerOptions": {
"outDir": "../../../dist/out-tsc",
"declaration": true,
"types": ["node"]
},
"include": ["src/**/*.ts"],
"exclude": ["jest.config.ts", "src/**/*.spec.ts", "src/**/*.test.ts"]
}