diff --git a/server/src/browser-management/classes/BrowserPool.ts b/server/src/browser-management/classes/BrowserPool.ts index cd4962a16..16b427e20 100644 --- a/server/src/browser-management/classes/BrowserPool.ts +++ b/server/src/browser-management/classes/BrowserPool.ts @@ -15,6 +15,8 @@ interface BrowserPoolInfo { * @default false */ active: boolean, + + isRobotRun?: boolean; } /** @@ -46,17 +48,29 @@ export class BrowserPool { * @param browser remote browser instance * @param active states if the browser's instance is being actively used */ - public addRemoteBrowser = (id: string, browser: RemoteBrowser, active: boolean = false): void => { + public addRemoteBrowser = (id: string, browser: RemoteBrowser, active: boolean = false, isRobotRun: boolean = false): void => { this.pool = { ...this.pool, [id]: { browser, active, + isRobotRun }, } logger.log('debug', `Remote browser with id: ${id} added to the pool`); }; + public hasActiveRobotRun(): boolean { + return Object.values(this.pool).some(info => info.isRobotRun); + } + + public clearRobotRunState(id: string): void { + if (this.pool[id]) { + this.pool[id].isRobotRun = false; + logger.log('debug', `Robot run state cleared for browser ${id}`); + } + } + /** * Removes the remote browser instance from the pool. * @param id remote browser instance's id @@ -67,6 +81,8 @@ export class BrowserPool { logger.log('warn', `Remote browser with id: ${id} does not exist in the pool`); return false; } + + this.clearRobotRunState(id); delete (this.pool[id]); logger.log('debug', `Remote browser with id: ${id} deleted from the pool`); return true; @@ -97,4 +113,4 @@ export class BrowserPool { logger.log('warn', `No active browser in the pool`); return null; }; -} +} \ No newline at end of file diff --git a/server/src/browser-management/controller.ts b/server/src/browser-management/controller.ts index 24a677ce1..bc9d173fd 100644 --- a/server/src/browser-management/controller.ts +++ b/server/src/browser-management/controller.ts @@ -59,7 +59,7 @@ export const createRemoteBrowserForRun = (userId: string): string => { async (socket: Socket) => { const browserSession = new RemoteBrowser(socket); await browserSession.initialize(userId); - browserPool.addRemoteBrowser(id, browserSession, true); + browserPool.addRemoteBrowser(id, browserSession, true, true); socket.emit('ready-for-run'); }); return id; @@ -154,4 +154,4 @@ export const stopRunningInterpretation = async () => { } else { logger.log('error', 'Cannot stop interpretation: No active browser or generator.'); } -}; +}; \ No newline at end of file diff --git a/server/src/routes/record.ts b/server/src/routes/record.ts index 51d3ff922..9584d6f9d 100644 --- a/server/src/routes/record.ts +++ b/server/src/routes/record.ts @@ -16,6 +16,7 @@ import stealthPlugin from 'puppeteer-extra-plugin-stealth'; import logger from "../logger"; import { getDecryptedProxyConfig } from './proxy'; import { requireSignIn } from '../middlewares/auth'; +import { browserPool } from '../server'; export const router = Router(); chromium.use(stealthPlugin()); @@ -33,6 +34,17 @@ router.all('/', requireSignIn, (req, res, next) => { next() // pass control to the next handler }) +router.use('/', requireSignIn, (req: AuthenticatedRequest, res: Response, next) => { + if (browserPool.hasActiveRobotRun()) { + logger.log('debug', 'Preventing browser initialization - robot run in progress'); + return res.status(403).json({ + error: 'Cannot initialize recording browser while a robot run is in progress' + }); + } + next(); +}); + + /** * GET endpoint for starting the remote browser recording session. * returns session's id @@ -131,4 +143,4 @@ router.get('/interpret', requireSignIn, async (req, res) => { router.get('/interpret/stop', requireSignIn, async (req, res) => { await stopRunningInterpretation(); return res.send('interpretation stopped'); -}); +}); \ No newline at end of file diff --git a/server/src/routes/storage.ts b/server/src/routes/storage.ts index e159d8cba..12a07b5cd 100644 --- a/server/src/routes/storage.ts +++ b/server/src/routes/storage.ts @@ -496,6 +496,11 @@ router.put('/runs/:id', requireSignIn, async (req: AuthenticatedRequest, res) => binaryOutput: {}, }); + const job = await workflowQueue.add( + 'run workflow', + { id, runId, userId: req.user.id, isScheduled: false }, + ); + const plainRun = run.toJSON(); return res.send({ diff --git a/server/src/worker.ts b/server/src/worker.ts index 3010a6b26..a9f8a7ec4 100644 --- a/server/src/worker.ts +++ b/server/src/worker.ts @@ -1,7 +1,8 @@ import { Queue, Worker } from 'bullmq'; import IORedis from 'ioredis'; import logger from './logger'; -import { handleRunRecording } from "./workflow-management/scheduler"; +import { handleRunRecording as handleScheduledRunRecording } from "./workflow-management/scheduler"; +import { handleRunRecording } from './workflow-management/record'; import Robot from './models/Robot'; import { computeNextRun } from './utils/schedule'; @@ -22,9 +23,11 @@ connection.on('error', (err) => { const workflowQueue = new Queue('workflow', { connection }); const worker = new Worker('workflow', async job => { - const { runId, userId, id } = job.data; + const { runId, userId, id, isScheduled = true } = job.data; try { - const result = await handleRunRecording(id, userId); + const result = isScheduled ? + await handleScheduledRunRecording(id, userId) : + await handleRunRecording(id, userId, runId); return result; } catch (error) { logger.error('Error running workflow:', error); @@ -34,23 +37,26 @@ const worker = new Worker('workflow', async job => { worker.on('completed', async (job: any) => { logger.log(`info`, `Job ${job.id} completed for ${job.data.runId}`); - const robot = await Robot.findOne({ where: { 'recording_meta.id': job.data.id } }); - if (robot) { - // Update `lastRunAt` to the current time - const lastRunAt = new Date(); + + if (job.data.isScheduled) { + const robot = await Robot.findOne({ where: { 'recording_meta.id': job.data.id } }); + if (robot) { + // Update `lastRunAt` to the current time + const lastRunAt = new Date(); - // Compute the next run date - if (robot.schedule && robot.schedule.cronExpression && robot.schedule.timezone) { - const nextRunAt = computeNextRun(robot.schedule.cronExpression, robot.schedule.timezone) || undefined; - await robot.update({ - schedule: { - ...robot.schedule, - lastRunAt, - nextRunAt, - }, - }); - } else { - logger.error('Robot schedule, cronExpression, or timezone is missing.'); + // Compute the next run date + if (robot.schedule && robot.schedule.cronExpression && robot.schedule.timezone) { + const nextRunAt = computeNextRun(robot.schedule.cronExpression, robot.schedule.timezone) || undefined; + await robot.update({ + schedule: { + ...robot.schedule, + lastRunAt, + nextRunAt, + }, + }); + } else { + logger.error('Robot schedule, cronExpression, or timezone is missing.'); + } } } }); diff --git a/server/src/workflow-management/classes/Interpreter.ts b/server/src/workflow-management/classes/Interpreter.ts index c8aec13c4..dde3daf6b 100644 --- a/server/src/workflow-management/classes/Interpreter.ts +++ b/server/src/workflow-management/classes/Interpreter.ts @@ -332,6 +332,8 @@ export class WorkflowInterpreter { }, {}) } + this.socket.emit('run-completed', "success"); + logger.log('debug', `Interpretation finished`); this.clearState(); return result; @@ -354,4 +356,4 @@ export class WorkflowInterpreter { this.socket = socket; this.subscribeToPausing(); }; -} +} \ No newline at end of file diff --git a/server/src/workflow-management/record.ts b/server/src/workflow-management/record.ts new file mode 100644 index 000000000..36049595d --- /dev/null +++ b/server/src/workflow-management/record.ts @@ -0,0 +1,231 @@ +// Import core dependencies +import { chromium } from 'playwright-extra'; +import stealthPlugin from 'puppeteer-extra-plugin-stealth'; +import { Page } from "playwright"; + +// Import local utilities and services +import { destroyRemoteBrowser } from '../browser-management/controller'; +import logger from '../logger'; +import { browserPool } from "../server"; +import { googleSheetUpdateTasks, processGoogleSheetUpdates } from "./integrations/gsheet"; +import { BinaryOutputService } from "../storage/mino"; +import { capture } from "../utils/analytics"; + +// Import models and types +import Robot from "../models/Robot"; +import Run from "../models/Run"; +import { WorkflowFile } from "maxun-core"; +import { io, Socket } from 'socket.io-client'; +import { io as serverIo } from "../server"; + +// Enable stealth mode for chromium +chromium.use(stealthPlugin()); + +async function readyForRunHandler(browserId: string, id: string) { + try { + const result = await executeRun(id); + + if (result && result.success) { + logger.info(`Interpretation of ${id} succeeded`); + resetRecordingState(browserId, id); + return result.interpretationInfo; + } else { + logger.error(`Interpretation of ${id} failed`); + await destroyRemoteBrowser(browserId); + resetRecordingState(browserId, id); + return null; + } + + } catch (error: any) { + logger.error(`Error during readyForRunHandler: ${error.message}`); + await destroyRemoteBrowser(browserId); + return null; + } +} + +function resetRecordingState(browserId: string, id: string) { + browserId = ''; + id = ''; +} + +function AddGeneratedFlags(workflow: WorkflowFile) { + const copy = JSON.parse(JSON.stringify(workflow)); + for (let i = 0; i < workflow.workflow.length; i++) { + copy.workflow[i].what.unshift({ + action: 'flag', + args: ['generated'], + }); + } + return copy; +} + +async function executeRun(id: string) { + try { + const run = await Run.findOne({ where: { runId: id } }); + if (!run) { + return { + success: false, + error: 'Run not found' + }; + } + + const plainRun = run.toJSON(); + + const recording = await Robot.findOne({ + where: { 'recording_meta.id': plainRun.robotMetaId }, + raw: true + }); + if (!recording) { + return { + success: false, + error: 'Recording not found' + }; + } + + const browser = browserPool.getRemoteBrowser(plainRun.browserId); + if (!browser) { + throw new Error('Could not access browser'); + } + + let currentPage = await browser.getCurrentPage(); + if (!currentPage) { + throw new Error('Could not create a new page'); + } + + const workflow = AddGeneratedFlags(recording.recording); + const interpretationInfo = await browser.interpreter.InterpretRecording( + workflow, + currentPage, + (newPage: Page) => currentPage = newPage, + plainRun.interpreterSettings + ); + + const binaryOutputService = new BinaryOutputService('maxun-run-screenshots'); + const uploadedBinaryOutput = await binaryOutputService.uploadAndStoreBinaryOutput( + run, + interpretationInfo.binaryOutput + ); + + await destroyRemoteBrowser(plainRun.browserId); + + const updatedRun = await run.update({ + ...run, + status: 'success', + finishedAt: new Date().toLocaleString(), + browserId: plainRun.browserId, + log: interpretationInfo.log.join('\n'), + serializableOutput: interpretationInfo.serializableOutput, + binaryOutput: uploadedBinaryOutput, + }); + + + let totalRowsExtracted = 0; + let extractedScreenshotsCount = 0; + let extractedItemsCount = 0; + + if (updatedRun.dataValues.binaryOutput && updatedRun.dataValues.binaryOutput["item-0"]) { + extractedScreenshotsCount = 1; + } + + if (updatedRun.dataValues.serializableOutput && updatedRun.dataValues.serializableOutput["item-0"]) { + const itemsArray = updatedRun.dataValues.serializableOutput["item-0"]; + extractedItemsCount = itemsArray.length; + totalRowsExtracted = itemsArray.reduce((total: number, item: any) => { + return total + Object.keys(item).length; + }, 0); + } + + logger.info(`Extracted Items Count: ${extractedItemsCount}`); + logger.info(`Extracted Screenshots Count: ${extractedScreenshotsCount}`); + logger.info(`Total Rows Extracted: ${totalRowsExtracted}`); + + + capture('maxun-oss-run-created-manual', { + runId: id, + created_at: new Date().toISOString(), + status: 'success', + extractedItemsCount, + totalRowsExtracted, + extractedScreenshotsCount, + }); + + // Handle Google Sheets integration + try { + googleSheetUpdateTasks[plainRun.runId] = { + robotId: plainRun.robotMetaId, + runId: plainRun.runId, + status: 'pending', + retries: 5, + }; + await processGoogleSheetUpdates(); + } catch (err: any) { + logger.error(`Failed to update Google Sheet for run: ${plainRun.runId}: ${err.message}`); + } + + serverIo.of(plainRun.browserId).emit('run-completed', 'success'); + + return { + success: true, + interpretationInfo: updatedRun.toJSON() + }; + + } catch (error: any) { + logger.error(`Error running robot: ${error.message}`); + const run = await Run.findOne({ where: { runId: id } }); + if (run) { + await run.update({ + status: 'failed', + finishedAt: new Date().toLocaleString(), + }); + + const plainRun = run.toJSON(); + serverIo.of(plainRun.browserId).emit('run-completed', 'failed'); + } + + capture('maxun-oss-run-created-manual', { + runId: id, + created_at: new Date().toISOString(), + status: 'failed', + error_message: error.message, + }); + + + return { + success: false, + error: error.message, + }; + } +} + +/** + * Main function to handle running a recording through the worker process + */ +export async function handleRunRecording(id: string, userId: string, runId: string) { + try { + if (!id || !runId || !userId) { + throw new Error('browserId or runId or userId is undefined'); + } + + const socket = io(`${process.env.BACKEND_URL ? process.env.BACKEND_URL : 'http://localhost:8080'}/${id}`, { + transports: ['websocket'], + rejectUnauthorized: false + }); + + socket.on('ready-for-run', () => readyForRunHandler(id, runId)); + + logger.info(`Running Robot: ${id}`); + + socket.on('disconnect', () => { + cleanupSocketListeners(socket, id, runId); + }); + + } catch (error: any) { + logger.error('Error running robot:', error); + throw error; + } +} + +function cleanupSocketListeners(socket: Socket, browserId: string, id: string) { + socket.off('ready-for-run', () => readyForRunHandler(browserId, id)); + logger.info(`Cleaned up listeners for browserId: ${browserId}, runId: ${id}`); +} diff --git a/src/pages/MainPage.tsx b/src/pages/MainPage.tsx index 0801a9339..c2c63af85 100644 --- a/src/pages/MainPage.tsx +++ b/src/pages/MainPage.tsx @@ -57,6 +57,7 @@ export const MainPage = ({ handleEditRecording, initialContent }: MainPageProps) if (response) { notify('success', t('main_page.notifications.abort_success', { name: runningRecordingName })); await stopRecording(ids.browserId); + localStorage.removeItem('runningRobot'); } else { notify('error', t('main_page.notifications.abort_failed', { name: runningRecordingName })); } @@ -72,13 +73,14 @@ export const MainPage = ({ handleEditRecording, initialContent }: MainPageProps) const readyForRunHandler = useCallback((browserId: string, runId: string) => { interpretStoredRecording(runId).then(async (interpretation: boolean) => { if (!aborted) { - if (interpretation) { - notify('success', t('main_page.notifications.interpretation_success', { name: runningRecordingName })); - } else { - notify('success', t('main_page.notifications.interpretation_failed', { name: runningRecordingName })); - // destroy the created browser - await stopRecording(browserId); - } + // if (interpretation) { + // notify('success', t('main_page.notifications.interpretation_success', { name: runningRecordingName })); + // } else { + // notify('success', t('main_page.notifications.interpretation_failed', { name: runningRecordingName })); + // // destroy the created browser + // await stopRecording(browserId); + // } + if (!interpretation) await stopRecording(browserId); } setRunningRecordingName(''); setCurrentInterpretationLog(''); @@ -94,15 +96,37 @@ export const MainPage = ({ handleEditRecording, initialContent }: MainPageProps) const handleRunRecording = useCallback((settings: RunSettings) => { createRunForStoredRecording(runningRecordingId, settings).then(({ browserId, runId, robotMetaId }: CreateRunResponse) => { setIds({ browserId, runId, robotMetaId }); + + localStorage.setItem('runningRobot', JSON.stringify({ + browserId, + runId, + robotMetaId, + recordingName: runningRecordingName + })); + navigate(`/runs/${robotMetaId}/run/${runId}`); + const socket = io(`${apiUrl}/${browserId}`, { transports: ["websocket"], rejectUnauthorized: false }); setSockets(sockets => [...sockets, socket]); - socket.on('ready-for-run', () => readyForRunHandler(browserId, runId)); + socket.on('debugMessage', debugMessageHandler); + socket.on('run-completed', (status) => { + if (status === 'success') { + notify('success', t('main_page.notifications.interpretation_success', { name: runningRecordingName })); + } else { + notify('error', t('main_page.notifications.interpretation_failed', { name: runningRecordingName })); + } + + localStorage.removeItem('runningRobot'); + setRunningRecordingName(''); + setCurrentInterpretationLog(''); + setRerenderRuns(true); + }); + setContent('runs'); if (browserId) { notify('info', t('main_page.notifications.run_started', { name: runningRecordingName })); @@ -111,10 +135,55 @@ export const MainPage = ({ handleEditRecording, initialContent }: MainPageProps) } }) return (socket: Socket, browserId: string, runId: string) => { - socket.off('ready-for-run', () => readyForRunHandler(browserId, runId)); socket.off('debugMessage', debugMessageHandler); + socket.off('run-completed'); } - }, [runningRecordingName, sockets, ids, readyForRunHandler, debugMessageHandler]) + }, [runningRecordingName, sockets, ids, notify, debugMessageHandler]) + + useEffect(() => { + const storedRobotInfo = localStorage.getItem('runningRobot'); + + if (storedRobotInfo) { + try { + const { browserId, runId, robotMetaId, recordingName } = JSON.parse(storedRobotInfo); + + setIds({ browserId, runId, robotMetaId }); + setRunningRecordingName(recordingName); + setContent('runs'); + + const socket = io(`${apiUrl}/${browserId}`, { + transports: ["websocket"], + rejectUnauthorized: false + }); + + socket.on('debugMessage', debugMessageHandler); + socket.on('run-completed', (status) => { + if (status === 'success') { + notify('success', t('main_page.notifications.interpretation_success', { name: recordingName })); + } else { + notify('error', t('main_page.notifications.interpretation_failed', { name: recordingName })); + } + + localStorage.removeItem('runningRobot'); + setRunningRecordingName(''); + setCurrentInterpretationLog(''); + setRerenderRuns(true); + }); + + setSockets(prevSockets => [...prevSockets, socket]); + } catch (error) { + console.error('Error restoring robot state:', error); + localStorage.removeItem('runningRobot'); + } + } + + return () => { + sockets.forEach(socket => { + socket.off('debugMessage', debugMessageHandler); + socket.off('run-completed'); + }); + }; + }, []); const handleScheduleRecording = (settings: ScheduleSettings) => { scheduleStoredRecording(runningRecordingId, settings)