Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
462 changes: 241 additions & 221 deletions bun.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
services:
activepieces:
image: ghcr.io/activepieces/activepieces:0.77.2
image: ghcr.io/activepieces/activepieces:0.77.3
container_name: activepieces
restart: unless-stopped
## Enable the following line if you already use AP_EXECUTION_MODE with SANDBOX_PROCESS or old activepieces, checking the breaking change documentation for more info.
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "activepieces",
"version": "0.77.2",
"version": "0.77.3",
"rcVersion": "0.78.0-rc.0",
"scripts": {
"prepare": "husky install",
Expand Down
10 changes: 9 additions & 1 deletion packages/engine/src/lib/handler/context/engine-constants.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { ContextVersion } from '@activepieces/pieces-framework'
import { DEFAULT_MCP_DATA, EngineGenericError, ExecuteFlowOperation, ExecutePropsOptions, ExecuteToolOperation, ExecuteTriggerOperation, ExecutionType, FlowVersionState, PlatformId, ProgressUpdateType, Project, ProjectId, ResumePayload, RunEnvironment, TriggerHookType } from '@activepieces/shared'
import { DEFAULT_MCP_DATA, EngineGenericError, ExecuteFlowOperation, ExecutePropsOptions, ExecuteToolOperation, ExecuteTriggerOperation, ExecutionType, flowStructureUtil, FlowVersionState, PlatformId, ProgressUpdateType, Project, ProjectId, ResumePayload, RunEnvironment, TriggerHookType } from '@activepieces/shared'
import { createPropsResolver, PropsResolver } from '../../variables/props-resolver'

type RetryConstants = {
Expand Down Expand Up @@ -29,6 +29,7 @@ type EngineConstantsParams = {
logsFileId?: string
timeoutInSeconds: number
platformId: PlatformId
stepNames: string[]
}

const DEFAULT_RETRY_CONSTANTS: RetryConstants = {
Expand Down Expand Up @@ -67,6 +68,7 @@ export class EngineConstants {
public readonly stepNameToTest?: string
public readonly logsUploadUrl?: string
public readonly logsFileId?: string
public readonly stepNames: string[] = []
private project: Project | null = null

public get isRunningApTests(): boolean {
Expand Down Expand Up @@ -109,6 +111,7 @@ export class EngineConstants {
this.logsFileId = params.logsFileId
this.platformId = params.platformId
this.timeoutInSeconds = params.timeoutInSeconds
this.stepNames = params.stepNames
}

public static fromExecuteFlowInput(input: ExecuteFlowOperation): EngineConstants {
Expand All @@ -133,6 +136,7 @@ export class EngineConstants {
logsFileId: input.logsFileId,
timeoutInSeconds: input.timeoutInSeconds,
platformId: input.platformId,
stepNames: flowStructureUtil.getAllSteps(input.flowVersion.trigger).map((step) => step.name),
})
}

Expand All @@ -156,6 +160,7 @@ export class EngineConstants {
stepNameToTest: undefined,
timeoutInSeconds: input.timeoutInSeconds,
platformId: input.platformId,
stepNames: [],
})
}

Expand All @@ -179,6 +184,7 @@ export class EngineConstants {
stepNameToTest: undefined,
timeoutInSeconds: input.timeoutInSeconds,
platformId: input.platformId,
stepNames: input.flowVersion?.trigger ? flowStructureUtil.getAllSteps(input.flowVersion.trigger).map((step) => step.name) : [],
})
}

Expand All @@ -202,6 +208,7 @@ export class EngineConstants {
stepNameToTest: undefined,
timeoutInSeconds: input.timeoutInSeconds,
platformId: input.platformId,
stepNames: flowStructureUtil.getAllSteps(input.flowVersion.trigger).map((step) => step.name),
})
}
public getPropsResolver(contextVersion: ContextVersion | undefined): PropsResolver {
Expand All @@ -210,6 +217,7 @@ export class EngineConstants {
engineToken: this.engineToken,
apiUrl: this.internalApiUrl,
contextVersion,
stepNames: this.stepNames,
})
}
private async getProject(): Promise<Project> {
Expand Down
15 changes: 10 additions & 5 deletions packages/engine/src/lib/handler/context/flow-execution-context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -164,11 +164,18 @@ export class FlowExecutorContext {
stepsCount: this.stepsCount + 1,
})
}


public currentState(): Record<string, unknown> {
let flattenedSteps: Record<string, unknown> = extractOutput(this.steps)
public currentState(referencedStepNames?: string[]): Record<string, unknown> {
const referencedSteps = referencedStepNames
? referencedStepNames.reduce((acc, stepName) => {
if (this.steps[stepName]) acc[stepName] = this.steps[stepName]
return acc
}, {} as Record<string, StepOutput>)
: this.steps

let flattenedSteps: Record<string, unknown> = extractOutput(referencedSteps)
let targetMap = this.steps

this.currentPath.path.forEach(([stepName, iteration]) => {
const stepOutput = targetMap[stepName]
if (!stepOutput.output || stepOutput.type !== FlowActionType.LOOP_ON_ITEMS) {
Expand All @@ -182,8 +189,6 @@ export class FlowExecutorContext {
})
return flattenedSteps
}


}

function extractOutput(steps: Record<string, StepOutput>): Record<string, unknown> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
StepOutputStatus,
} from '@activepieces/shared'
import { createPropsResolver } from '../../variables/props-resolver'
import { EngineConstants } from './engine-constants'
import { FlowExecutorContext } from './flow-execution-context'

export const testExecutionContext = {
Expand All @@ -22,6 +23,7 @@ export const testExecutionContext = {
engineToken,
apiUrl,
sampleData,
engineConstants,
}: TestExecutionParams): Promise<FlowExecutorContext> {
let flowExecutionContext = FlowExecutorContext.empty()
if (isNil(flowVersion)) {
Expand Down Expand Up @@ -55,6 +57,7 @@ export const testExecutionContext = {
projectId,
engineToken,
contextVersion: LATEST_CONTEXT_VERSION,
stepNames: engineConstants.stepNames,
}).resolve<{ items: unknown[] }>({
unresolvedInput: step.settings,
executionState: flowExecutionContext,
Expand Down Expand Up @@ -90,6 +93,7 @@ export const testExecutionContext = {


type TestExecutionParams = {
engineConstants: EngineConstants
flowVersion?: FlowVersion
excludedStepName?: string
projectId: string
Expand Down
2 changes: 2 additions & 0 deletions packages/engine/src/lib/helper/piece-helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ export const pieceHelper = {
projectId: operation.projectId,
engineToken: operation.engineToken,
sampleData: operation.sampleData,
engineConstants: constants,
})
const { property, piece } = await pieceLoader.getPropOrThrow({ pieceName: operation.pieceName, pieceVersion: operation.pieceVersion, actionOrTriggerName: operation.actionOrTriggerName, propertyName: operation.propertyName, devPieces: EngineConstants.DEV_PIECES })

Expand All @@ -48,6 +49,7 @@ export const pieceHelper = {
projectId: constants.projectId,
engineToken: constants.engineToken,
contextVersion: piece.getContextInfo?.().version,
stepNames: constants.stepNames,
}).resolve<
StaticPropsValue<PiecePropertyMap>
>({
Expand Down
6 changes: 5 additions & 1 deletion packages/engine/src/lib/helper/trigger-helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ export const triggerHelper = {
engineToken: constants.engineToken,
devPieces: constants.devPieces,
propertySettings,
stepNames: constants.stepNames,
})
const isOldVersionOrNotSupported = isNil(pieceTrigger.onStart)
if (isOldVersionOrNotSupported) {
Expand Down Expand Up @@ -89,6 +90,7 @@ export const triggerHelper = {
engineToken: params.engineToken,
devPieces: constants.devPieces,
propertySettings,
stepNames: constants.stepNames,
})

const appListeners: Listener[] = []
Expand Down Expand Up @@ -272,7 +274,7 @@ type ExecuteTriggerParams = {
constants: EngineConstants
}

async function prepareTriggerExecution({ pieceName, pieceVersion, triggerName, input, propertySettings, projectId, apiUrl, engineToken, devPieces }: PrepareTriggerExecutionParams) {
async function prepareTriggerExecution({ pieceName, pieceVersion, triggerName, input, propertySettings, projectId, apiUrl, engineToken, devPieces, stepNames }: PrepareTriggerExecutionParams) {
const { piece, pieceTrigger } = await pieceLoader.getPieceAndTriggerOrThrow({
pieceName,
pieceVersion,
Expand All @@ -285,6 +287,7 @@ async function prepareTriggerExecution({ pieceName, pieceVersion, triggerName, i
projectId,
engineToken,
contextVersion: piece.getContextInfo?.().version,
stepNames,
}).resolve<StaticPropsValue<PiecePropertyMap>>({
unresolvedInput: input,
executionState: FlowExecutorContext.empty(),
Expand All @@ -309,4 +312,5 @@ type PrepareTriggerExecutionParams = {
apiUrl: string
engineToken: string
devPieces: string[]
stepNames: string[]
}
1 change: 1 addition & 0 deletions packages/engine/src/lib/operations/flow.operation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ const executieSingleStepOrFlowOperation = async (input: ExecuteFlowOperation): P
projectId: input.projectId,
engineToken: input.engineToken,
sampleData: input.sampleData,
engineConstants: constants,
})
const step = flowStructureUtil.getActionOrThrow(input.stepNameToTest!, input.flowVersion.trigger)
return flowExecutor.execute({
Expand Down
18 changes: 16 additions & 2 deletions packages/engine/src/lib/variables/props-resolver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ const CONNECTIONS = 'connections'
const FLATTEN_NESTED_KEYS_PATTERN = /\{\{\s*flattenNestedKeys(.*?)\}\}/g


export const createPropsResolver = ({ engineToken, projectId, apiUrl, contextVersion }: PropsResolverParams) => {
export const createPropsResolver = ({ engineToken, projectId, apiUrl, contextVersion, stepNames }: PropsResolverParams) => {
return {
resolve: async <T = unknown>(params: ResolveInputParams): Promise<ResolveResult<T>> => {
const { unresolvedInput, executionState } = params
Expand All @@ -21,7 +21,8 @@ export const createPropsResolver = ({ engineToken, projectId, apiUrl, contextVer
censoredInput: unresolvedInput,
}
}
const currentState = executionState.currentState()
const referencedStepNames = extractReferencedStepNames(unresolvedInput, stepNames)
const currentState = executionState.currentState(Array.from(referencedStepNames))
const resolveOptions = {
engineToken,
projectId,
Expand Down Expand Up @@ -80,6 +81,18 @@ const mergeFlattenedKeysArraysIntoOneArray = async (token: string, partsThatNeed
}

export type PropsResolver = ReturnType<typeof createPropsResolver>

function extractReferencedStepNames(input: unknown, stepNames: string[]): Set<string> {
const stringifiedInput = JSON.stringify(input)
const referencedSteps = new Set<string>()
for (const stepName of stepNames) {
if (stringifiedInput.includes(stepName)) {
referencedSteps.add(stepName)
}
}
return referencedSteps
}

/**
* input: `Hello {{firstName}} {{lastName}}`
* tokenThatNeedResolving: [`{{firstName}}`, `{{lastName}}`]
Expand Down Expand Up @@ -251,4 +264,5 @@ type PropsResolverParams = {
projectId: string
apiUrl: string
contextVersion: ContextVersion | undefined
stepNames: string[]
}
27 changes: 11 additions & 16 deletions packages/engine/src/lib/worker-socket.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { inspect } from 'util'
import {
emitWithAck,
EngineGenericError,
EngineOperation,
EngineOperationType,
EngineResponse,
Expand All @@ -10,13 +9,11 @@ import {
EngineStderr,
EngineStdout,
ERROR_MESSAGES_TO_REDACT,
isNil,
} from '@activepieces/shared'
import { io, type Socket } from 'socket.io-client'
import { execute } from './operations'
import { utils } from './utils'

const WORKER_ID = process.env.WORKER_ID
const WS_URL = 'ws://127.0.0.1:12345'

let socket: Socket | undefined
Expand All @@ -28,27 +25,25 @@ async function executeFromSocket(operation: EngineOperation, operationType: Engi
}

export const workerSocket = {
init: (): void => {
if (isNil(WORKER_ID)) {
throw new EngineGenericError('WorkerIdNotSetError', 'WORKER_ID environment variable is not set')
}

init: (sandboxId: string): void => {

socket = io(WS_URL, {
path: '/worker/ws',
auth: {
workerId: WORKER_ID,
sandboxId,
},
autoConnect: true,
autoConnect: false,
reconnection: true,
})


// Redirect console.log/error/warn to socket
const originalLog = console.log
console.log = function (...args): void {
const engineStdout: EngineStdout = {
message: args.join(' ') + '\n',
}
socket?.emit(EngineSocketEvent.ENGINE_STDOUT, engineStdout)
socket?.emit('command', { event: EngineSocketEvent.ENGINE_STDOUT, payload: engineStdout })
originalLog.apply(console, args)
}

Expand All @@ -57,7 +52,7 @@ export const workerSocket = {
const engineStdout: EngineStdout = {
message: args.join(' ') + '\n',
}
socket?.emit(EngineSocketEvent.ENGINE_STDOUT, engineStdout)
socket?.emit('command', { event: EngineSocketEvent.ENGINE_STDOUT, payload: engineStdout })
originalWarn.apply(console, args)
}

Expand All @@ -70,7 +65,7 @@ export const workerSocket = {
const engineStderr: EngineStderr = {
message: sanitizedArgs.join(' ') + '\n',
}
socket?.emit(EngineSocketEvent.ENGINE_STDERR, engineStderr)
socket?.emit('command', { event: EngineSocketEvent.ENGINE_STDERR, payload: engineStderr })

originalError.apply(console, sanitizedArgs)
}
Expand All @@ -91,14 +86,14 @@ export const workerSocket = {
}
})


socket.connect()
},

sendToWorkerWithAck: async (
type: EngineSocketEvent,
data: unknown,
): Promise<void> => {
await emitWithAck(socket, type, data, {
await emitWithAck(socket, 'command', { event: type, payload: data }, {
timeoutMs: 4000,
retries: 4,
retryDelayMs: 1000,
Expand All @@ -109,7 +104,7 @@ export const workerSocket = {
const engineStderr: EngineStderr = {
message: inspect(error),
}
await emitWithAck(socket, EngineSocketEvent.ENGINE_STDERR, engineStderr, {
await emitWithAck(socket, 'command', { event: EngineSocketEvent.ENGINE_STDERR, payload: engineStderr }, {
timeoutMs: 3000,
retries: 4,
retryDelayMs: 1000,
Expand Down
9 changes: 5 additions & 4 deletions packages/engine/src/main.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import { isNil } from '@activepieces/shared'
import { workerSocket } from './lib/worker-socket'

const WORKER_ID = process.env.WORKER_ID
process.title = `engine-${WORKER_ID}`
const SANDBOX_ID = process.env.SANDBOX_ID
process.title = `sandbox-${SANDBOX_ID}`

if (!isNil(WORKER_ID)) {
workerSocket.init()

if (!isNil(SANDBOX_ID)) {
workerSocket.init(SANDBOX_ID)
}

process.on('uncaughtException', (error) => {
Expand Down
4 changes: 2 additions & 2 deletions packages/engine/test/handler/flow-looping.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ describe('flow with looping', () => {
firstLoopAction: codeAction,
}),
executionState: FlowExecutorContext.empty(),
constants: generateMockEngineConstants(),
constants: generateMockEngineConstants({ stepNames: ['loop'] }),
})

const loopOut = result.steps.loop as LoopStepOutput
Expand Down Expand Up @@ -48,7 +48,7 @@ describe('flow with looping', () => {
const result = await flowExecutor.execute({
action: generateArray,
executionState: FlowExecutorContext.empty(),
constants: generateMockEngineConstants(),
constants: generateMockEngineConstants({ stepNames: ['echo_step'] }),
})

const loopOut = result.steps.loop as LoopStepOutput
Expand Down
Loading
Loading