Skip to content

Commit a01be8d

Browse files
authored
fix(piece metadata): dev pieces memory leak by using cached require (activepieces#11127)
1 parent 06546ec commit a01be8d

File tree

7 files changed

+44
-18
lines changed

7 files changed

+44
-18
lines changed

packages/server/shared/package.json

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
"main": "./src/index.js",
66
"typings": "./src/index.d.ts",
77
"dependencies": {
8-
"@activepieces/pieces-framework": "0.25.0",
8+
"@activepieces/pieces-framework": "0.25.1",
99
"@activepieces/shared": "0.34.0",
1010
"tslib": "2.6.2",
1111
"pino": "10.1.0",
@@ -18,7 +18,6 @@
1818
"@sinclair/typebox": "0.34.11",
1919
"pino-loki": "2.1.3",
2020
"async-mutex": "0.4.0",
21-
"@activepieces/import-fresh-webpack": "3.3.0",
2221
"clear-module": "4.1.2",
2322
"ioredis": "5.4.1",
2423
"redlock": "5.0.0-beta.2",

packages/server/shared/src/lib/pieces/file-pieces-utils.ts

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ import { readdir, readFile, stat } from 'node:fs/promises'
22
import { join, resolve } from 'node:path'
33
import { cwd } from 'node:process'
44
import { sep } from 'path'
5-
import importFresh from '@activepieces/import-fresh-webpack'
65
import { Piece, PieceMetadata, pieceTranslation } from '@activepieces/pieces-framework'
76
import { extractPieceFromModule } from '@activepieces/shared'
87
import clearModule from 'clear-module'
@@ -73,6 +72,14 @@ export const filePiecesUtils = (log: FastifyBaseLogger) => ({
7372
return []
7473
}
7574
},
75+
76+
77+
clearPieceModuleCache: (distFolderPath: string): void => {
78+
const indexPath = join(distFolderPath, 'src', 'index')
79+
const packageJsonPath = join(distFolderPath, 'package.json')
80+
clearModule(indexPath)
81+
clearModule(packageJsonPath)
82+
},
7683
})
7784

7885
const findAllPiecesFolder = async (folderPath: string): Promise<string[]> => {
@@ -100,13 +107,11 @@ const loadPieceFromFolder = async (
100107
folderPath: string,
101108
): Promise<PieceMetadata | null> => {
102109
const indexPath = join(folderPath, 'src', 'index')
103-
clearModule(indexPath)
104-
const packageJson = importFresh<Record<string, string>>(
105-
join(folderPath, 'package.json'),
106-
)
107-
const module = importFresh<Record<string, unknown>>(
108-
indexPath,
109-
)
110+
const packageJsonPath = join(folderPath, 'package.json')
111+
// eslint-disable-next-line @typescript-eslint/no-var-requires
112+
const packageJson = require(packageJsonPath)
113+
// eslint-disable-next-line @typescript-eslint/no-var-requires
114+
const module = require(indexPath)
110115
const { name: pieceName, version: pieceVersion } = packageJson
111116
const piece = extractPieceFromModule<Piece>({
112117
module,

packages/server/worker/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
"main": "./src/index.js",
66
"typings": "./src/index.d.ts",
77
"dependencies": {
8-
"@activepieces/pieces-framework": "0.25.0",
8+
"@activepieces/pieces-framework": "0.25.1",
99
"@activepieces/server-shared": "0.0.2",
1010
"@activepieces/shared": "0.34.0",
1111
"write-file-atomic": "5.0.1",

packages/server/worker/src/lib/cache/pieces/development/dev-pieces-builder.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,13 @@ async function buildPieces(pieceNames: string[], io: Server, log: FastifyBaseLog
3535

3636
log.info(chalk.blue.bold(`Build completed in ${buildTime.toFixed(2)} seconds`))
3737

38+
for (const pieceName of pieceNames) {
39+
const distPath = await filePiecesUtils(log).findDistPiecePathByPackageName(`@activepieces/piece-${pieceName}`)
40+
if (distPath) {
41+
filePiecesUtils(log).clearPieceModuleCache(distPath)
42+
}
43+
}
44+
3845
devPiecesState.incrementGeneration()
3946
io.emit(WebsocketClientEvent.REFRESH_PIECE)
4047
}
@@ -74,7 +81,7 @@ export async function devPiecesBuilder(app: FastifyInstance, io: Server, package
7481
await devPiecesInstaller(app.log).linkSharedActivepiecesPackagesToPiece(packageJsonName)
7582
}
7683

77-
for (const { packageName, pieceDirectory } of pieceInfos) {
84+
for (const { packageName, pieceDirectory, packageJsonName } of pieceInfos) {
7885
app.log.info(chalk.blue(`Starting watch for package: ${packageName}`))
7986
app.log.info(chalk.yellow(`Found piece directory: ${pieceDirectory}`))
8087

@@ -83,6 +90,7 @@ export async function devPiecesBuilder(app: FastifyInstance, io: Server, package
8390
try {
8491
await buildPieces([packageName], io, app.log)
8592
await devPiecesInstaller(app.log).linkSharedActivepiecesPackagesToEachOther()
93+
await devPiecesInstaller(app.log).linkSharedActivepiecesPackagesToPiece(packageJsonName)
8694
}
8795
catch (error) {
8896
app.log.error(error)

packages/server/worker/src/lib/cache/pieces/development/dev-pieces-installer.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ export const devPiecesInstaller = (log: FastifyBaseLogger) => ({
3535

3636
for (const dependency of apDependencies) {
3737
try {
38-
await spawnWithKill({ cmd: `bun link --cwd ${packagePath} --save ${dependency} --quiet`, printOutput: true })
38+
await spawnWithKill({ cmd: `bun link --cwd ${packagePath} --save ${dependency} --silent`, printOutput: true })
3939
}
4040
catch (e: unknown) {
4141
const errorMessage = e instanceof Error ? e.message : String(e)
@@ -54,7 +54,7 @@ export const devPiecesInstaller = (log: FastifyBaseLogger) => ({
5454
const packages = sharedPiecesPackages()
5555
for (const [name, pkg] of Object.entries(packages)) {
5656
try {
57-
await spawnWithKill({ cmd: `bun link --cwd ${pkg.path} --quiet`, printOutput: true })
57+
await spawnWithKill({ cmd: `bun link --cwd ${pkg.path} --silent`, printOutput: true })
5858
}
5959
catch (e: unknown) {
6060
const errorMessage = e instanceof Error ? e.message : String(e)
@@ -82,7 +82,7 @@ export const devPiecesInstaller = (log: FastifyBaseLogger) => ({
8282

8383
for (const dependency of apDependencies) {
8484
try {
85-
await spawnWithKill({ cmd: `bun link --cwd ${pkg.path} --save ${dependency} --quiet`, printOutput: true })
85+
await spawnWithKill({ cmd: `bun link --cwd ${pkg.path} --save ${dependency} --silent`, printOutput: true })
8686
}
8787
catch (e: unknown) {
8888
const errorMessage = e instanceof Error ? e.message : String(e)

packages/server/worker/src/lib/compute/operation-handler.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ async function executeSingleTask<Result extends OperationResult>(log: FastifyBas
206206
}, async (span) => {
207207
let sandbox: Sandbox | undefined
208208
try {
209-
sandbox = sandboxPool.allocate(log)
209+
sandbox = await sandboxPool.allocate(log)
210210
await sandbox.start({ flowVersionId: getFlowVersionId(operation, operationType), platformId: operation.platformId })
211211

212212
const { engine, stdError, stdOut } = await sandbox.execute(operationType, operation, { timeoutInSeconds })

packages/server/worker/src/lib/compute/sandbox/sandbox-pool.ts

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
import { ApEnvironment, ExecutionMode, isNil } from '@activepieces/shared'
22
import { FastifyBaseLogger } from 'fastify'
33
import { nanoid } from 'nanoid'
4+
import { devPiecesState } from '../../cache/pieces/development/dev-pieces-state'
45
import { workerMachine } from '../../utils/machine'
56
import { createSandbox, Sandbox } from './sandbox'
67

78
const sandboxes: Map<string, Sandbox> = new Map()
9+
const sandboxGenerations: Map<string, number> = new Map()
810
let sandboxQueue: string[] = []
911
let workerConcurrency: number
1012
let sandboxMemoryLimit: number
@@ -23,14 +25,24 @@ export const sandboxPool = {
2325
reusable = canReuseWorkers()
2426
sandboxQueue = Array.from({ length: workerConcurrency }, () => nanoid())
2527
},
26-
allocate: (log: FastifyBaseLogger): Sandbox => {
28+
allocate: async (log: FastifyBaseLogger): Promise<Sandbox> => {
2729
const sandboxId = sandboxQueue.shift()
2830
if (!sandboxId) {
2931
throw new Error('No sandbox available')
3032
}
3133
const existingSandbox = sandboxes.get(sandboxId)
3234
if (!isNil(existingSandbox)) {
33-
return existingSandbox
35+
const workerGeneration = sandboxGenerations.get(sandboxId) ?? 0
36+
if (devPiecesState.isWorkerGenerationStale(workerGeneration)) {
37+
log.debug({ sandboxId, workerGeneration }, 'Sandbox generation stale, restarting')
38+
await existingSandbox.shutdown()
39+
sandboxes.delete(sandboxId)
40+
sandboxGenerations.delete(sandboxId)
41+
}
42+
else {
43+
log.debug({ sandboxId, workerGeneration }, 'Sandbox generation is up to date, reusing')
44+
return existingSandbox
45+
}
3446
}
3547
const workerSettings = workerMachine.getSettings()
3648
const allowedEnvVariables = workerSettings.SANDBOX_PROPAGATED_ENV_VARS
@@ -51,6 +63,7 @@ export const sandboxPool = {
5163
reusable,
5264
})
5365
sandboxes.set(sandboxId, newSandbox)
66+
sandboxGenerations.set(sandboxId, devPiecesState.getGeneration())
5467
return newSandbox
5568
},
5669
release: async (sandbox: Sandbox | undefined) => {
@@ -68,6 +81,7 @@ export const sandboxPool = {
6881
await sandbox.shutdown()
6982
}
7083
sandboxes.clear()
84+
sandboxGenerations.clear()
7185
},
7286
}
7387

0 commit comments

Comments
 (0)