From 384951550f5ff44afd194f226aab42093b3a3443 Mon Sep 17 00:00:00 2001 From: Chitkul Lakshya Date: Mon, 20 Apr 2026 22:11:55 +0530 Subject: [PATCH 1/2] feat: offload github webhooks to async queue Move GitHub App webhook processing off the request path with delivery-id idempotency, background workers, and job status tracking so burst traffic is acknowledged quickly without duplicate processing. Made-with: Cursor --- .../githubAppWebhookIdempotency.test.js | 76 ++++++ backend/models/Project.js | 11 + backend/routes/githubAppWebhook.js | 236 ++++-------------- backend/services/githubWebhookWorker.js | 187 ++++++++++++++ backend/services/webhookQueue.js | 166 ++++++++++++ 5 files changed, 488 insertions(+), 188 deletions(-) create mode 100644 backend/__tests__/githubAppWebhookIdempotency.test.js create mode 100644 backend/services/githubWebhookWorker.js create mode 100644 backend/services/webhookQueue.js diff --git a/backend/__tests__/githubAppWebhookIdempotency.test.js b/backend/__tests__/githubAppWebhookIdempotency.test.js new file mode 100644 index 0000000..6a9332c --- /dev/null +++ b/backend/__tests__/githubAppWebhookIdempotency.test.js @@ -0,0 +1,76 @@ +const request = require('supertest'); +const express = require('express'); + +jest.mock('../middleware/verifyGithub', () => (_req, _res, next) => next()); + +const mockProcessGithubWebhookJob = jest.fn(); +jest.mock('../services/githubWebhookWorker', () => ({ + processGithubWebhookJob: (...args) => mockProcessGithubWebhookJob(...args), +})); + +const { + waitForWebhookQueueIdle, + __resetWebhookQueueForTests, +} = require('../services/webhookQueue'); + +const buildPushPayload = () => ({ + repository: { + id: 99, + name: 'repo-a', + full_name: 'owner-a/repo-a', + }, + sender: { + login: 'octocat', + }, + commits: [ + { + id: 'sha-1', + message: 'feat: update architecture flow', + added: ['src/a.js'], + modified: ['src/b.js'], + removed: [], + }, + ], +}); + +describe('GitHub webhook queue idempotency', () => { + beforeEach(() => { + jest.clearAllMocks(); + __resetWebhookQueueForTests(); + mockProcessGithubWebhookJob.mockResolvedValue({ + projectId: 'project-1', + commitCount: 1, + }); + }); + + afterEach(() => { + __resetWebhookQueueForTests(); + }); + + test('same delivery ID submitted twice only queues/processes once', async () => { + const app = express(); + app.use(express.json()); + app.set('io', { emit: jest.fn() }); + app.use('/api/github-app', require('../routes/githubAppWebhook')); + + const first = await request(app) + .post('/api/github-app/webhook') + .set('x-github-event', 'push') + .set('x-github-delivery', 'delivery-dup-1') + .send(buildPushPayload()); + + const second = await request(app) + .post('/api/github-app/webhook') + .set('x-github-event', 'push') + .set('x-github-delivery', 'delivery-dup-1') + .send(buildPushPayload()); + + expect(first.status).toBe(202); + expect(first.body.duplicate).toBe(false); + expect(second.status).toBe(202); + expect(second.body.duplicate).toBe(true); + + await waitForWebhookQueueIdle(); + expect(mockProcessGithubWebhookJob).toHaveBeenCalledTimes(1); + }); +}); diff --git a/backend/models/Project.js b/backend/models/Project.js index f76a99f..eb375c0 100644 --- a/backend/models/Project.js +++ b/backend/models/Project.js @@ -26,6 +26,17 @@ const projectSchema = new mongoose.Schema( // Tracking isTrackingActive: { type: Boolean, default: false }, + + // Webhook aggregation snapshot (used to reduce per-commit fanout/write load) + lastWebhookEventAt: { type: Date, default: null }, + lastWebhookCommitCount: { type: Number, default: 0 }, + lastWebhookCommitShas: { type: [String], default: [] }, + lastWebhookChangedFiles: { type: [String], default: [] }, + lastWebhookPusher: { type: String, default: null }, + lastWebhookDeliveryId: { type: String, default: null }, + lastWebhookAiSummary: { type: String, default: null }, + lastWebhookAiTaskMentions: { type: Number, default: 0 }, + lastWebhookAiAnalyzedCommits: { type: Number, default: 0 }, }, { timestamps: true, diff --git a/backend/routes/githubAppWebhook.js b/backend/routes/githubAppWebhook.js index a604423..55fc210 100644 --- a/backend/routes/githubAppWebhook.js +++ b/backend/routes/githubAppWebhook.js @@ -1,207 +1,67 @@ const express = require('express'); const router = express.Router(); const verifyGithub = require('../middleware/verifyGithub'); -const { analyzeCommit } = require('../utils/commitAnalysisService'); -const ProjectTask = require('../models/ProjectTask'); -const Project = require('../models/Project'); -const Step = require('../models/Step'); -const User = require('../models/User'); -const Repository = require('../models/Repository'); -const Session = require('../models/Session'); -const { normalizeDoc } = require('../utils/normalize'); -const { getProjectWithSteps } = require('../utils/projectHelper'); - -const normalizeTaskStatus = (value) => String(value || '').trim().toLowerCase(); -const COMMIT_CODE_REGEX = /\b\d{10}\b/g; - -const extractCommitCodes = (message) => { - const matches = String(message || '').match(COMMIT_CODE_REGEX); - return [...new Set(matches || [])]; -}; - -const computeStatusFromCommit = ({ fromStatus, hasOwnerGeneratedCommitCode }) => { - const normalizedFrom = normalizeTaskStatus(fromStatus); - - if (hasOwnerGeneratedCommitCode) { - return 'Done'; - } - - if (normalizedFrom === 'done' || normalizedFrom.includes('complete')) { - return null; - } - - return 'In Progress'; +const { processGithubWebhookJob } = require('../services/githubWebhookWorker'); +const { + registerWebhookProcessor, + enqueueWebhookJob, + getWebhookJobStatus, +} = require('../services/webhookQueue'); + +const isDebugWebhookEnabled = + process.env.DEBUG_WEBHOOKS === 'true' || String(process.env.LOG_LEVEL || '').toLowerCase() === 'debug'; + +const debugWebhookLog = (...args) => { + if (!isDebugWebhookEnabled) return; + console.log(...args); }; -async function logTaskProgressActivity({ recipients, taskTitle, projectName, actorName, projectId, taskId, fromStatus, toStatus }) { - const uniqueRecipients = [...new Set((recipients || []).filter(Boolean))]; - if (uniqueRecipients.length === 0) return; - - const now = new Date(); - await Session.insertMany( - uniqueRecipients.map((uid) => ({ - userId: uid, - startTime: now, - endTime: now, - duration: 0, - activeDuration: 0, - date: now.toISOString().split('T')[0], - eventType: 'task-progressed', - title: `Task moved to ${toStatus}: ${taskTitle}`, - source: projectName || 'Tasks', - actorName: actorName || 'GitHub', - metadata: { - projectId: String(projectId || ''), - taskId: String(taskId || ''), - projectName: projectName || null, - fromStatus: fromStatus || null, - toStatus: toStatus || null, - trigger: 'commit', - }, - })) - ); -} +registerWebhookProcessor(processGithubWebhookJob); -// POST /api/github-app/webhook — GitHub App webhook handler +// POST /api/github-app/webhook — enqueue webhook for async processing router.post('/webhook', verifyGithub, async (req, res) => { try { const event = req.headers['x-github-event']; - console.log(`[GitHub App Webhook] Received event: ${event}`); - - // Handle installation events - if (event === 'installation' || event === 'installation_repositories') { - console.log(`[GitHub App Webhook] Installation event processed`); - return res.status(200).json({ message: 'Installation event acknowledged' }); - } - - // Handle push events - if (event !== 'push') { - return res.status(200).json({ message: `Event ${event} ignored` }); - } - - const { commits, repository, sender, installation } = req.body; - - if (!commits || commits.length === 0) { - return res.status(200).json({ message: 'No commits to process' }); - } - - console.log(`[GitHub App Webhook] Processing ${commits.length} commits from ${repository?.full_name}`); - - // Try to find the project this repository is linked to - const repoFullName = repository?.full_name; - const repoId = repository?.id?.toString(); - let linkedProject = null; + const deliveryId = req.headers['x-github-delivery']; + const normalizedDeliveryId = String(deliveryId || '').trim(); - if (repoFullName) { - const [repoOwner, repoName] = repoFullName.split('/'); - linkedProject = await Project.findOne({ - githubRepoOwner: repoOwner, - githubRepoName: repoName, - }).lean(); - } - - if (!linkedProject && repoId) { - linkedProject = await Project.findOne({ githubRepoIds: repoId }).lean(); - } - - const results = []; - - for (const commit of commits) { - const message = commit.message; - const analysis = await analyzeCommit(message); - const commitCodesInMessage = extractCommitCodes(message); - - let task = null; - let displayId = analysis.id || null; - - if (analysis.found && analysis.id) { - displayId = analysis.id; - task = await ProjectTask.findOne({ displayId }).lean(); - if (!task) { - const taskByDisplayRegex = await ProjectTask.findOne({ - displayId: { $regex: `^${String(displayId).replace(/[-[\]{}()*+?.,\\^$|#\s]/g, '\\$&')}$`, $options: 'i' } - }).lean(); - task = taskByDisplayRegex || null; - } - } - - if (!task && commitCodesInMessage.length > 0) { - task = await ProjectTask.findOne({ commitCode: { $in: commitCodesInMessage } }).lean(); - } - - if (!task) { - results.push({ commit: commit.id?.substring(0, 7), status: 'no_task_found' }); - continue; - } - - // Update task with commit info - const fromStatus = task.status; - const hasOwnerGeneratedCommitCode = Boolean(task.commitCode && commitCodesInMessage.includes(String(task.commitCode))); - const updateData = { - commitMessage: message, - commitUrl: commit.url, - commitAuthor: sender?.login || commit.author?.name || 'Unknown', - commitTimestamp: commit.timestamp || new Date().toISOString(), - }; - - const nextStatus = computeStatusFromCommit({ - fromStatus, - hasOwnerGeneratedCommitCode, - }); - if (nextStatus) { - updateData.status = nextStatus; - } - - await ProjectTask.updateOne({ _id: task._id }, { $set: updateData }); - - // Emit socket event - const step = await Step.findById(task.stepId).lean(); - if (step) { - const project = await Project.findById(step.projectId).lean(); - if (updateData.status && updateData.status !== fromStatus) { - await logTaskProgressActivity({ - recipients: [project?.ownerUid, ...(project?.team || []), task.assignedTo], - taskTitle: task.title, - projectName: project?.name, - actorName: sender?.login || commit.author?.name || 'GitHub', - projectId: step.projectId, - taskId: task._id, - fromStatus, - toStatus: updateData.status, - }); - } - - const projectData = await getProjectWithSteps(step.projectId); - const io = req.app.get('io'); - if (io) { - const updatedTask = normalizeDoc({ ...task, ...updateData }); - io.emit('taskUpdated', { - task: updatedTask, - taskId: String(task._id), - status: updateData.status || task.status, - projectId: step.projectId.toString(), - }); - io.emit('projectUpdate', { - projectId: projectData.id, - project: projectData, - }); - } - } - - results.push({ - commit: commit.id?.substring(0, 7), - displayId: task.displayId || displayId || null, - status: 'updated', - action: hasOwnerGeneratedCommitCode ? 'Complete' : 'In Progress', + if (!normalizedDeliveryId) { + return res.status(400).json({ + message: 'Missing x-github-delivery header', }); } - console.log(`[GitHub App Webhook] Processed ${results.length} commits`); - res.json({ message: 'Webhook processed', results }); + const enqueueResult = enqueueWebhookJob({ + deliveryId: normalizedDeliveryId, + event, + payload: req.body, + getIo: () => req.app.get('io'), + }); + + debugWebhookLog( + `[GitHub App Webhook] delivery=${normalizedDeliveryId} event=${event || 'unknown'} duplicate=${enqueueResult.duplicate}` + ); + + return res.status(202).json({ + message: enqueueResult.duplicate ? 'Duplicate delivery already queued/processed' : 'Webhook accepted', + duplicate: enqueueResult.duplicate, + deliveryId: normalizedDeliveryId, + job: enqueueResult.job, + }); } catch (error) { console.error('[GitHub App Webhook] Error:', error); - res.status(500).json({ message: 'Webhook processing failed', error: error.message }); + return res.status(500).json({ message: 'Webhook enqueue failed', error: error.message }); + } +}); + +// GET /api/github-app/webhook/jobs/:deliveryId — lightweight internal async status endpoint +router.get('/webhook/jobs/:deliveryId', (req, res) => { + const { deliveryId } = req.params; + const job = getWebhookJobStatus(deliveryId); + if (!job) { + return res.status(404).json({ message: 'Job not found', deliveryId }); } + return res.json({ job }); }); module.exports = router; diff --git a/backend/services/githubWebhookWorker.js b/backend/services/githubWebhookWorker.js new file mode 100644 index 0000000..629d013 --- /dev/null +++ b/backend/services/githubWebhookWorker.js @@ -0,0 +1,187 @@ +const Project = require('../models/Project'); +const { analyzeCommit } = require('../utils/commitAnalysisService'); +const { + DELIVERY_CATCHUP_BATCH_SIZE, + DELIVERY_CATCHUP_MAX_BATCHES, +} = require('../config/freeTierLimits'); + +const isDebugWebhookEnabled = + process.env.DEBUG_WEBHOOKS === 'true' || String(process.env.LOG_LEVEL || '').toLowerCase() === 'debug'; + +const debugWebhookLog = (...args) => { + if (!isDebugWebhookEnabled) return; + console.log(...args); +}; + +const toUniqueStrings = (values) => + [...new Set((values || []).map((v) => String(v || '').trim()).filter(Boolean))]; + +const aggregateProjectEffectsFromCommits = (commits = []) => { + const commitShas = []; + const changedFiles = []; + for (const commit of commits) { + if (commit?.id) commitShas.push(String(commit.id)); + if (Array.isArray(commit?.added)) changedFiles.push(...commit.added); + if (Array.isArray(commit?.modified)) changedFiles.push(...commit.modified); + if (Array.isArray(commit?.removed)) changedFiles.push(...commit.removed); + } + return { + commitShas: toUniqueStrings(commitShas), + changedFiles: toUniqueStrings(changedFiles), + commitCount: commits.length, + }; +}; + +const TASK_REF_REGEX = /\b(?:TASK-\d+|ID-\d+|#\d+)\b/i; + +const analyzeArchitectureImpact = async (commits = []) => { + const commitMessages = commits.map((commit) => String(commit?.message || '').trim()).filter(Boolean); + if (commitMessages.length === 0) { + return { + analyzedCommits: 0, + taskReferenceMentions: 0, + summary: 'No commit messages available for analysis', + }; + } + + // Skip expensive remote LLM calls if no key is configured. + if (!process.env.GROQ_API_KEY) { + const taskReferenceMentions = commitMessages.filter((message) => TASK_REF_REGEX.test(message)).length; + return { + analyzedCommits: commitMessages.length, + taskReferenceMentions, + summary: + taskReferenceMentions > 0 + ? `Detected ${taskReferenceMentions} task reference(s) in commit batch` + : 'No explicit task references detected in commit batch', + }; + } + + const sampleSize = Math.min(3, commitMessages.length); + let taskReferenceMentions = 0; + for (const message of commitMessages.slice(0, sampleSize)) { + const analysis = await analyzeCommit(message); + if (analysis?.found) { + taskReferenceMentions += 1; + } + } + + return { + analyzedCommits: sampleSize, + taskReferenceMentions, + summary: + taskReferenceMentions > 0 + ? `AI analysis found ${taskReferenceMentions} task-linked commit(s) in sampled batch` + : 'AI analysis found no task-linked commits in sampled batch', + }; +}; + +const findLinkedProject = async (repository) => { + const repoFullName = repository?.full_name; + const repoId = repository?.id?.toString(); + let linkedProject = null; + + if (repoFullName) { + const [repoOwner, repoName] = repoFullName.split('/'); + linkedProject = await Project.findOne({ + githubRepoOwner: repoOwner, + githubRepoName: repoName, + }).lean(); + } + + if (!linkedProject && repoId) { + linkedProject = await Project.findOne({ githubRepoIds: repoId }).lean(); + } + + return linkedProject; +}; + +const processGithubWebhookJob = async ({ deliveryId, event, payload, getIo }) => { + if (event === 'installation' || event === 'installation_repositories') { + return { ignored: true, reason: 'installation_event' }; + } + + if (event !== 'push') { + return { ignored: true, reason: `event_${event || 'unknown'}_ignored` }; + } + + const { commits, repository, sender } = payload || {}; + if (!Array.isArray(commits) || commits.length === 0) { + return { ignored: true, reason: 'no_commits' }; + } + + const maxProcessableCommits = DELIVERY_CATCHUP_BATCH_SIZE * DELIVERY_CATCHUP_MAX_BATCHES; + const commitsToProcess = commits.slice(0, maxProcessableCommits); + const droppedCommits = Math.max(0, commits.length - commitsToProcess.length); + + const linkedProject = await findLinkedProject(repository); + if (!linkedProject) { + return { + ignored: true, + reason: 'no_linked_project', + droppedCommits, + }; + } + + const linkedProjectId = String(linkedProject._id || linkedProject.id); + const effect = { + projectId: linkedProjectId, + projectName: linkedProject.name || repository?.name || 'Project', + repository: repository?.full_name || null, + ...aggregateProjectEffectsFromCommits(commitsToProcess), + }; + + const architectureAnalysis = await analyzeArchitectureImpact(commitsToProcess); + const now = new Date(); + + await Project.updateOne( + { _id: effect.projectId }, + { + $set: { + lastWebhookEventAt: now, + lastWebhookCommitCount: effect.commitCount, + lastWebhookCommitShas: effect.commitShas, + lastWebhookChangedFiles: effect.changedFiles, + lastWebhookPusher: sender?.login || null, + lastWebhookAiSummary: architectureAnalysis.summary, + lastWebhookAiTaskMentions: architectureAnalysis.taskReferenceMentions, + lastWebhookAiAnalyzedCommits: architectureAnalysis.analyzedCommits, + lastWebhookDeliveryId: String(deliveryId || ''), + updatedAt: now, + }, + } + ); + + const io = typeof getIo === 'function' ? getIo() : null; + if (io) { + io.emit('projectUpdate', { + projectId: effect.projectId, + eventType: 'github_push_aggregated', + summary: { + projectName: effect.projectName, + repository: effect.repository, + commitCount: effect.commitCount, + changedFiles: effect.changedFiles, + pusher: sender?.login || null, + aiSummary: architectureAnalysis.summary, + processedAt: now.toISOString(), + }, + }); + } + + debugWebhookLog( + `[GitHub Worker] Delivery ${deliveryId} processed (${effect.commitCount} commits) for project ${effect.projectId}` + ); + + return { + projectId: effect.projectId, + commitCount: effect.commitCount, + changedFilesCount: effect.changedFiles.length, + droppedCommits, + aiSummary: architectureAnalysis.summary, + }; +}; + +module.exports = { + processGithubWebhookJob, +}; diff --git a/backend/services/webhookQueue.js b/backend/services/webhookQueue.js new file mode 100644 index 0000000..d43b4c9 --- /dev/null +++ b/backend/services/webhookQueue.js @@ -0,0 +1,166 @@ +const { getSafeEnvInt } = require('../utils/safeEnv'); + +const MAX_STORED_JOBS = getSafeEnvInt('WEBHOOK_QUEUE_MAX_STORED_JOBS', 100, 5000, 1000); + +const jobsByDeliveryId = new Map(); +const queue = []; + +let isDraining = false; +let webhookProcessor = null; + +const toIsoNow = () => new Date().toISOString(); + +const toPublicJob = (job) => { + if (!job) return null; + return { + deliveryId: job.deliveryId, + event: job.event, + status: job.status, + attempts: job.attempts, + createdAt: job.createdAt, + startedAt: job.startedAt || null, + completedAt: job.completedAt || null, + updatedAt: job.updatedAt, + result: job.result || null, + error: job.error || null, + }; +}; + +const pruneOldJobs = () => { + if (jobsByDeliveryId.size <= MAX_STORED_JOBS) return; + const removable = jobsByDeliveryId.size - MAX_STORED_JOBS; + let removed = 0; + for (const [deliveryId, job] of jobsByDeliveryId.entries()) { + if (removed >= removable) break; + if (job.status === 'completed' || job.status === 'failed') { + jobsByDeliveryId.delete(deliveryId); + removed += 1; + } + } +}; + +const processOneJob = async (queuedItem) => { + const job = jobsByDeliveryId.get(queuedItem.deliveryId); + if (!job) return; + if (!webhookProcessor) { + throw new Error('Webhook queue processor is not registered'); + } + + job.status = 'processing'; + job.attempts += 1; + job.startedAt = job.startedAt || toIsoNow(); + job.updatedAt = toIsoNow(); + + try { + const result = await webhookProcessor(queuedItem); + job.status = 'completed'; + job.result = result || null; + job.completedAt = toIsoNow(); + job.updatedAt = toIsoNow(); + } catch (error) { + job.status = 'failed'; + job.error = error?.message || 'Unknown worker error'; + job.completedAt = toIsoNow(); + job.updatedAt = toIsoNow(); + } +}; + +const drainQueue = async () => { + if (isDraining) return; + isDraining = true; + try { + while (queue.length > 0) { + const queuedItem = queue.shift(); + await processOneJob(queuedItem); + } + } finally { + isDraining = false; + } +}; + +const scheduleDrain = () => { + setImmediate(() => { + drainQueue().catch((error) => { + console.error('[WebhookQueue] Drain failure:', error); + }); + }); +}; + +const registerWebhookProcessor = (processor) => { + webhookProcessor = processor; +}; + +const enqueueWebhookJob = ({ deliveryId, event, payload, getIo }) => { + const normalizedDeliveryId = String(deliveryId || '').trim(); + if (!normalizedDeliveryId) { + throw new Error('deliveryId is required for webhook queue idempotency'); + } + + const existingJob = jobsByDeliveryId.get(normalizedDeliveryId); + if (existingJob) { + return { + duplicate: true, + job: toPublicJob(existingJob), + }; + } + + const job = { + deliveryId: normalizedDeliveryId, + event: String(event || '').trim() || null, + status: 'queued', + attempts: 0, + createdAt: toIsoNow(), + updatedAt: toIsoNow(), + startedAt: null, + completedAt: null, + result: null, + error: null, + }; + jobsByDeliveryId.set(normalizedDeliveryId, job); + + queue.push({ + deliveryId: normalizedDeliveryId, + event: job.event, + payload: payload || {}, + getIo: typeof getIo === 'function' ? getIo : null, + }); + pruneOldJobs(); + scheduleDrain(); + + return { + duplicate: false, + job: toPublicJob(job), + }; +}; + +const getWebhookJobStatus = (deliveryId) => { + const normalizedDeliveryId = String(deliveryId || '').trim(); + if (!normalizedDeliveryId) return null; + const job = jobsByDeliveryId.get(normalizedDeliveryId); + return toPublicJob(job); +}; + +const waitForWebhookQueueIdle = async (timeoutMs = 3000) => { + const start = Date.now(); + while (isDraining || queue.length > 0) { + if (Date.now() - start > timeoutMs) { + throw new Error('Timed out waiting for webhook queue to drain'); + } + await new Promise((resolve) => setTimeout(resolve, 10)); + } +}; + +const __resetWebhookQueueForTests = () => { + queue.splice(0, queue.length); + jobsByDeliveryId.clear(); + isDraining = false; + webhookProcessor = null; +}; + +module.exports = { + registerWebhookProcessor, + enqueueWebhookJob, + getWebhookJobStatus, + waitForWebhookQueueIdle, + __resetWebhookQueueForTests, +}; From 4c39de819fd0b71dc1b84f094302184213938adb Mon Sep 17 00:00:00 2001 From: Chitkul Lakshya Date: Mon, 20 Apr 2026 22:15:29 +0530 Subject: [PATCH 2/2] feat: add operational metrics and load shedding guardrails Expose internal memory and webhook queue metrics, and add heap-based load shedding for heavy endpoints with allowlisted core paths to reduce OOM risk under free-tier pressure. Made-with: Cursor --- .../internalMetricsAndLoadShedding.test.js | 85 +++++++++++++++++++ backend/index.js | 4 + backend/middleware/loadShedding.js | 59 +++++++++++++ backend/routes/internalMetrics.js | 27 ++++++ backend/services/webhookQueue.js | 21 +++++ 5 files changed, 196 insertions(+) create mode 100644 backend/__tests__/internalMetricsAndLoadShedding.test.js create mode 100644 backend/middleware/loadShedding.js create mode 100644 backend/routes/internalMetrics.js diff --git a/backend/__tests__/internalMetricsAndLoadShedding.test.js b/backend/__tests__/internalMetricsAndLoadShedding.test.js new file mode 100644 index 0000000..2ab6a90 --- /dev/null +++ b/backend/__tests__/internalMetricsAndLoadShedding.test.js @@ -0,0 +1,85 @@ +const request = require('supertest'); +const express = require('express'); + +const ORIGINAL_MEMORY_USAGE = process.memoryUsage; + +describe('internal metrics route', () => { + test('returns memory and webhook queue metrics', async () => { + const app = express(); + app.use('/internal', require('../routes/internalMetrics')); + + const res = await request(app).get('/internal/metrics'); + expect(res.status).toBe(200); + expect(res.body).toEqual( + expect.objectContaining({ + memoryMb: expect.objectContaining({ + rss: expect.any(Number), + heapUsed: expect.any(Number), + heapTotal: expect.any(Number), + }), + webhookQueue: expect.objectContaining({ + depth: expect.any(Number), + lagMs: expect.any(Number), + processing: expect.any(Boolean), + trackedJobs: expect.any(Number), + }), + timestamp: expect.any(String), + }) + ); + }); +}); + +describe('load shedding middleware', () => { + const loadSheddingPath = '../middleware/loadShedding'; + + afterEach(() => { + jest.resetModules(); + process.memoryUsage = ORIGINAL_MEMORY_USAGE; + delete process.env.LOAD_SHED_HEAP_LIMIT_MB; + delete process.env.LOAD_SHED_RETRY_AFTER_SECONDS; + delete process.env.LOAD_SHED_HEAVY_PATHS; + }); + + test('returns 503 for heavy route when heap limit is crossed', async () => { + process.env.LOAD_SHED_HEAP_LIMIT_MB = '50'; + + const { loadSheddingMiddleware } = require(loadSheddingPath); + const app = express(); + app.use('/api', loadSheddingMiddleware); + app.get('/api/generate-project', (_req, res) => res.status(200).json({ ok: true })); + + process.memoryUsage = jest.fn(() => ({ + rss: 800 * 1024 * 1024, + heapTotal: 600 * 1024 * 1024, + heapUsed: 550 * 1024 * 1024, + external: 0, + arrayBuffers: 0, + })); + + const res = await request(app).get('/api/generate-project'); + expect(res.status).toBe(503); + expect(res.headers['retry-after']).toBeDefined(); + expect(res.body.reason).toBe('load_shedding'); + }); + + test('does not block allowlisted sessions path even above limit', async () => { + process.env.LOAD_SHED_HEAP_LIMIT_MB = '50'; + + const { loadSheddingMiddleware } = require(loadSheddingPath); + const app = express(); + app.use('/api', loadSheddingMiddleware); + app.get('/api/sessions', (_req, res) => res.status(200).json({ ok: true })); + + process.memoryUsage = jest.fn(() => ({ + rss: 800 * 1024 * 1024, + heapTotal: 600 * 1024 * 1024, + heapUsed: 550 * 1024 * 1024, + external: 0, + arrayBuffers: 0, + })); + + const res = await request(app).get('/api/sessions'); + expect(res.status).toBe(200); + expect(res.body.ok).toBe(true); + }); +}); diff --git a/backend/index.js b/backend/index.js index 1c00e80..a2eedbd 100644 --- a/backend/index.js +++ b/backend/index.js @@ -8,6 +8,7 @@ const helmet = require('helmet'); const { Server } = require("socket.io"); const rateLimit = require('express-rate-limit'); require('dotenv').config(); +const { loadSheddingMiddleware } = require('./middleware/loadShedding'); const app = express(); @@ -81,6 +82,7 @@ const chatRoutes = require('./routes/chatRoutes'); const taskRoutes = require('./routes/taskRoutes'); const calendarRoutes = require('./routes/calendarRoutes'); const supportRoutes = require('./routes/supportRoutes'); +const internalMetricsRoutes = require('./routes/internalMetrics'); @@ -120,6 +122,7 @@ const limiter = rateLimit({ // Apply rate limiting to all requests app.use('/api/', limiter); +app.use('/api/', loadSheddingMiddleware); // Keep raw body only for webhook signature verification routes. const webhookJsonParser = express.json({ @@ -158,6 +161,7 @@ app.use('/api/google', require('./routes/googleRoutes')); app.use('/api/calendar', calendarRoutes); app.use('/api/support', supportRoutes); app.use('/api/cache/sample', require('./routes/redisCacheSampleRoutes')); +app.use('/internal', internalMetricsRoutes); diff --git a/backend/middleware/loadShedding.js b/backend/middleware/loadShedding.js new file mode 100644 index 0000000..bd4cc39 --- /dev/null +++ b/backend/middleware/loadShedding.js @@ -0,0 +1,59 @@ +const { getSafeEnvInt } = require('../utils/safeEnv'); + +const HEAP_LIMIT_MB = getSafeEnvInt('LOAD_SHED_HEAP_LIMIT_MB', 128, 4096, 400); +const RETRY_AFTER_SECONDS = getSafeEnvInt('LOAD_SHED_RETRY_AFTER_SECONDS', 1, 300, 15); + +const ALLOWLIST_PATH_PREFIXES = ['/api/auth', '/api/sessions', '/api/chat']; +const DEFAULT_HEAVY_PATH_PREFIXES = [ + '/api/github-app/webhook', + '/api/webhooks/github', + '/api/generate-project', + '/api/design', + '/api/inspiration', +]; + +const parsePathListEnv = (rawValue) => + String(rawValue || '') + .split(',') + .map((value) => value.trim()) + .filter(Boolean); + +const HEAVY_PATH_PREFIXES = (() => { + const custom = parsePathListEnv(process.env.LOAD_SHED_HEAVY_PATHS); + return custom.length > 0 ? custom : DEFAULT_HEAVY_PATH_PREFIXES; +})(); + +const isPathMatchedByPrefixes = (path, prefixes) => prefixes.some((prefix) => path.startsWith(prefix)); + +const loadSheddingMiddleware = (req, res, next) => { + const requestPath = String((req.originalUrl || req.path || '').split('?')[0]); + + if (isPathMatchedByPrefixes(requestPath, ALLOWLIST_PATH_PREFIXES)) { + return next(); + } + + if (!isPathMatchedByPrefixes(requestPath, HEAVY_PATH_PREFIXES)) { + return next(); + } + + const heapUsedMb = process.memoryUsage().heapUsed / (1024 * 1024); + if (heapUsedMb < HEAP_LIMIT_MB) { + return next(); + } + + res.set('Retry-After', String(RETRY_AFTER_SECONDS)); + return res.status(503).json({ + message: 'Service under memory pressure, please retry shortly.', + reason: 'load_shedding', + heapUsedMb: Number(heapUsedMb.toFixed(2)), + heapLimitMb: HEAP_LIMIT_MB, + }); +}; + +module.exports = { + loadSheddingMiddleware, + ALLOWLIST_PATH_PREFIXES, + HEAVY_PATH_PREFIXES, + HEAP_LIMIT_MB, + RETRY_AFTER_SECONDS, +}; diff --git a/backend/routes/internalMetrics.js b/backend/routes/internalMetrics.js new file mode 100644 index 0000000..c9a64c9 --- /dev/null +++ b/backend/routes/internalMetrics.js @@ -0,0 +1,27 @@ +const express = require('express'); +const router = express.Router(); +const { getWebhookQueueMetrics } = require('../services/webhookQueue'); + +const bytesToMb = (bytes) => Number((bytes / (1024 * 1024)).toFixed(2)); + +router.get('/metrics', (_req, res) => { + const memoryUsage = process.memoryUsage(); + const queueMetrics = getWebhookQueueMetrics(); + + return res.json({ + memoryMb: { + rss: bytesToMb(memoryUsage.rss), + heapUsed: bytesToMb(memoryUsage.heapUsed), + heapTotal: bytesToMb(memoryUsage.heapTotal), + }, + webhookQueue: { + depth: queueMetrics.depth, + lagMs: queueMetrics.lagMs, + processing: queueMetrics.processing, + trackedJobs: queueMetrics.trackedJobs, + }, + timestamp: new Date().toISOString(), + }); +}); + +module.exports = router; diff --git a/backend/services/webhookQueue.js b/backend/services/webhookQueue.js index d43b4c9..0202a97 100644 --- a/backend/services/webhookQueue.js +++ b/backend/services/webhookQueue.js @@ -140,6 +140,26 @@ const getWebhookJobStatus = (deliveryId) => { return toPublicJob(job); }; +const getWebhookQueueMetrics = () => { + const now = Date.now(); + const queuedAgesMs = queue + .map((queuedItem) => { + const job = jobsByDeliveryId.get(queuedItem.deliveryId); + if (!job?.createdAt) return 0; + const createdAtMs = Date.parse(job.createdAt); + if (!Number.isFinite(createdAtMs)) return 0; + return Math.max(0, now - createdAtMs); + }) + .filter((age) => Number.isFinite(age)); + + return { + depth: queue.length, + lagMs: queuedAgesMs.length > 0 ? Math.max(...queuedAgesMs) : 0, + processing: isDraining, + trackedJobs: jobsByDeliveryId.size, + }; +}; + const waitForWebhookQueueIdle = async (timeoutMs = 3000) => { const start = Date.now(); while (isDraining || queue.length > 0) { @@ -161,6 +181,7 @@ module.exports = { registerWebhookProcessor, enqueueWebhookJob, getWebhookJobStatus, + getWebhookQueueMetrics, waitForWebhookQueueIdle, __resetWebhookQueueForTests, };