Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions packages/server/shared/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"main": "./src/index.js",
"typings": "./src/index.d.ts",
"dependencies": {
"@activepieces/pieces-framework": "0.25.0",
"@activepieces/pieces-framework": "0.25.1",
"@activepieces/shared": "0.34.0",
"tslib": "2.6.2",
"pino": "10.1.0",
Expand All @@ -18,7 +18,6 @@
"@sinclair/typebox": "0.34.11",
"pino-loki": "2.1.3",
"async-mutex": "0.4.0",
"@activepieces/import-fresh-webpack": "3.3.0",
"clear-module": "4.1.2",
"ioredis": "5.4.1",
"redlock": "5.0.0-beta.2",
Expand Down
21 changes: 13 additions & 8 deletions packages/server/shared/src/lib/pieces/file-pieces-utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import { readdir, readFile, stat } from 'node:fs/promises'
import { join, resolve } from 'node:path'
import { cwd } from 'node:process'
import { sep } from 'path'
import importFresh from '@activepieces/import-fresh-webpack'
import { Piece, PieceMetadata, pieceTranslation } from '@activepieces/pieces-framework'
import { extractPieceFromModule } from '@activepieces/shared'
import clearModule from 'clear-module'
Expand Down Expand Up @@ -73,6 +72,14 @@ export const filePiecesUtils = (log: FastifyBaseLogger) => ({
return []
}
},


clearPieceModuleCache: (distFolderPath: string): void => {
const indexPath = join(distFolderPath, 'src', 'index')
const packageJsonPath = join(distFolderPath, 'package.json')
clearModule(indexPath)
clearModule(packageJsonPath)
},
})

const findAllPiecesFolder = async (folderPath: string): Promise<string[]> => {
Expand Down Expand Up @@ -100,13 +107,11 @@ const loadPieceFromFolder = async (
folderPath: string,
): Promise<PieceMetadata | null> => {
const indexPath = join(folderPath, 'src', 'index')
clearModule(indexPath)
const packageJson = importFresh<Record<string, string>>(
join(folderPath, 'package.json'),
)
const module = importFresh<Record<string, unknown>>(
indexPath,
)
const packageJsonPath = join(folderPath, 'package.json')
// eslint-disable-next-line @typescript-eslint/no-var-requires
const packageJson = require(packageJsonPath)
// eslint-disable-next-line @typescript-eslint/no-var-requires
const module = require(indexPath)
const { name: pieceName, version: pieceVersion } = packageJson
const piece = extractPieceFromModule<Piece>({
module,
Expand Down
2 changes: 1 addition & 1 deletion packages/server/worker/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"main": "./src/index.js",
"typings": "./src/index.d.ts",
"dependencies": {
"@activepieces/pieces-framework": "0.25.0",
"@activepieces/pieces-framework": "0.25.1",
"@activepieces/server-shared": "0.0.2",
"@activepieces/shared": "0.34.0",
"write-file-atomic": "5.0.1",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ async function buildPieces(pieceNames: string[], io: Server, log: FastifyBaseLog

log.info(chalk.blue.bold(`Build completed in ${buildTime.toFixed(2)} seconds`))

for (const pieceName of pieceNames) {
const distPath = await filePiecesUtils(log).findDistPiecePathByPackageName(`@activepieces/piece-${pieceName}`)
if (distPath) {
filePiecesUtils(log).clearPieceModuleCache(distPath)
}
}

devPiecesState.incrementGeneration()
io.emit(WebsocketClientEvent.REFRESH_PIECE)
}
Expand Down Expand Up @@ -74,7 +81,7 @@ export async function devPiecesBuilder(app: FastifyInstance, io: Server, package
await devPiecesInstaller(app.log).linkSharedActivepiecesPackagesToPiece(packageJsonName)
}

for (const { packageName, pieceDirectory } of pieceInfos) {
for (const { packageName, pieceDirectory, packageJsonName } of pieceInfos) {
app.log.info(chalk.blue(`Starting watch for package: ${packageName}`))
app.log.info(chalk.yellow(`Found piece directory: ${pieceDirectory}`))

Expand All @@ -83,6 +90,7 @@ export async function devPiecesBuilder(app: FastifyInstance, io: Server, package
try {
await buildPieces([packageName], io, app.log)
await devPiecesInstaller(app.log).linkSharedActivepiecesPackagesToEachOther()
await devPiecesInstaller(app.log).linkSharedActivepiecesPackagesToPiece(packageJsonName)
}
catch (error) {
app.log.error(error)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ export const devPiecesInstaller = (log: FastifyBaseLogger) => ({

for (const dependency of apDependencies) {
try {
await spawnWithKill({ cmd: `bun link --cwd ${packagePath} --save ${dependency} --quiet`, printOutput: true })
await spawnWithKill({ cmd: `bun link --cwd ${packagePath} --save ${dependency} --silent`, printOutput: true })
}
catch (e: unknown) {
const errorMessage = e instanceof Error ? e.message : String(e)
Expand All @@ -54,7 +54,7 @@ export const devPiecesInstaller = (log: FastifyBaseLogger) => ({
const packages = sharedPiecesPackages()
for (const [name, pkg] of Object.entries(packages)) {
try {
await spawnWithKill({ cmd: `bun link --cwd ${pkg.path} --quiet`, printOutput: true })
await spawnWithKill({ cmd: `bun link --cwd ${pkg.path} --silent`, printOutput: true })
}
catch (e: unknown) {
const errorMessage = e instanceof Error ? e.message : String(e)
Expand Down Expand Up @@ -82,7 +82,7 @@ export const devPiecesInstaller = (log: FastifyBaseLogger) => ({

for (const dependency of apDependencies) {
try {
await spawnWithKill({ cmd: `bun link --cwd ${pkg.path} --save ${dependency} --quiet`, printOutput: true })
await spawnWithKill({ cmd: `bun link --cwd ${pkg.path} --save ${dependency} --silent`, printOutput: true })
}
catch (e: unknown) {
const errorMessage = e instanceof Error ? e.message : String(e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ async function executeSingleTask<Result extends OperationResult>(log: FastifyBas
}, async (span) => {
let sandbox: Sandbox | undefined
try {
sandbox = sandboxPool.allocate(log)
sandbox = await sandboxPool.allocate(log)
await sandbox.start({ flowVersionId: getFlowVersionId(operation, operationType), platformId: operation.platformId })

const { engine, stdError, stdOut } = await sandbox.execute(operationType, operation, { timeoutInSeconds })
Expand Down
18 changes: 16 additions & 2 deletions packages/server/worker/src/lib/compute/sandbox/sandbox-pool.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import { ApEnvironment, ExecutionMode, isNil } from '@activepieces/shared'
import { FastifyBaseLogger } from 'fastify'
import { nanoid } from 'nanoid'
import { devPiecesState } from '../../cache/pieces/development/dev-pieces-state'
import { workerMachine } from '../../utils/machine'
import { createSandbox, Sandbox } from './sandbox'

const sandboxes: Map<string, Sandbox> = new Map()
const sandboxGenerations: Map<string, number> = new Map()
let sandboxQueue: string[] = []
let workerConcurrency: number
let sandboxMemoryLimit: number
Expand All @@ -23,14 +25,24 @@ export const sandboxPool = {
reusable = canReuseWorkers()
sandboxQueue = Array.from({ length: workerConcurrency }, () => nanoid())
},
allocate: (log: FastifyBaseLogger): Sandbox => {
allocate: async (log: FastifyBaseLogger): Promise<Sandbox> => {
const sandboxId = sandboxQueue.shift()
if (!sandboxId) {
throw new Error('No sandbox available')
}
const existingSandbox = sandboxes.get(sandboxId)
if (!isNil(existingSandbox)) {
return existingSandbox
const workerGeneration = sandboxGenerations.get(sandboxId) ?? 0
if (devPiecesState.isWorkerGenerationStale(workerGeneration)) {
log.debug({ sandboxId, workerGeneration }, 'Sandbox generation stale, restarting')
await existingSandbox.shutdown()
sandboxes.delete(sandboxId)
sandboxGenerations.delete(sandboxId)
}
else {
log.debug({ sandboxId, workerGeneration }, 'Sandbox generation is up to date, reusing')
return existingSandbox
}
}
const workerSettings = workerMachine.getSettings()
const allowedEnvVariables = workerSettings.SANDBOX_PROPAGATED_ENV_VARS
Expand All @@ -51,6 +63,7 @@ export const sandboxPool = {
reusable,
})
sandboxes.set(sandboxId, newSandbox)
sandboxGenerations.set(sandboxId, devPiecesState.getGeneration())
return newSandbox
},
release: async (sandbox: Sandbox | undefined) => {
Expand All @@ -68,6 +81,7 @@ export const sandboxPool = {
await sandbox.shutdown()
}
sandboxes.clear()
sandboxGenerations.clear()
},
}

Expand Down
Loading