diff --git a/docs/webhooks.md b/docs/webhooks.md index abfc39e..fc831bd 100644 --- a/docs/webhooks.md +++ b/docs/webhooks.md @@ -124,6 +124,25 @@ Response (404): { "error": "Dead letter not found or already replayed" } ``` +#### POST `/api/admin/vaults/:id/replay-events` + +Replays all recorded outbox lifecycle events for a single vault to an optional target subscriber. Preserves original event ordering and event IDs/idempotency keys. + +Request Body (Optional): +```json +{ + "subscriber_id": "90b1e428-2f19-4b2b-8a71-3cb56667104b" +} +``` + +Response (200): +```json +{ + "replayed": true, + "count": 3 +} +``` + ## Test-Ping Endpoint `POST /api/webhooks/:id/test` lets subscribers self-verify their delivery URL and HMAC wiring before real vault events start flowing. diff --git a/package-lock.json b/package-lock.json index 8d0d17b..5d8283c 100644 --- a/package-lock.json +++ b/package-lock.json @@ -172,7 +172,6 @@ "resolved": "https://registry.npmjs.org/@aws-sdk/client-s3/-/client-s3-3.1058.0.tgz", "integrity": "sha512-AfED3hhaBZ121NuiBImgnlF98kQRMk6hGPMGfj/Oo1hSaoMFRzM+N4nlICCasUSM2R8QaIRZRYGpZ3fy0ilGZQ==", "license": "Apache-2.0", - "peer": true, "dependencies": { "@aws-crypto/sha1-browser": "5.2.0", "@aws-crypto/sha256-browser": "5.2.0", @@ -673,7 +672,6 @@ "integrity": "sha512-RgHBCvtjbOK2gXSNBNIkNoEc9qoVEtau3hj8gEqKQuL3HZAibKarWFEI3Lfm6EYKkLalOh8eSrj9b+ch9H/VBA==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "@babel/code-frame": "^7.29.7", "@babel/generator": "^7.29.7", @@ -1169,13 +1167,39 @@ "dev": true, "license": "MIT" }, + "node_modules/@emnapi/core": { + "version": "1.11.1", + "resolved": "https://registry.npmjs.org/@emnapi/core/-/core-1.11.1.tgz", + "integrity": "sha512-RSvbQmHzdKzNsLYa/wHrbc3KN4sYLKAdPZxqiM2HATqv/SBk2/ENSHpvXGaLOMcsAyz0poEGqkmmKYG3OWiJEQ==", + "dev": true, + "license": "MIT", + "optional": true, + "peer": true, + "dependencies": { + "@emnapi/wasi-threads": "1.2.2", + "tslib": "^2.4.0" + } + }, + "node_modules/@emnapi/runtime": { + "version": "1.11.1", + "resolved": "https://registry.npmjs.org/@emnapi/runtime/-/runtime-1.11.1.tgz", + "integrity": "sha512-vgj7R3y3Wgx24IQaGPA/R6YFXLHVMOZ0uVEyIQPaWs+rd1AzfEMXlAC22FYwO1XkKR6NPsq7mUandH8oIRdZFw==", + "dev": true, + "license": "MIT", + "optional": true, + "peer": true, + "dependencies": { + "tslib": "^2.4.0" + } + }, "node_modules/@emnapi/wasi-threads": { - "version": "1.2.1", - "resolved": "https://registry.npmjs.org/@emnapi/wasi-threads/-/wasi-threads-1.2.1.tgz", - "integrity": "sha512-uTII7OYF+/Mes/MrcIOYp5yOtSMLBWSIoLPpcgwipoiKbli6k322tcoFsxoIIxPDqW01SQGAgko4EzZi2BNv2w==", + "version": "1.2.2", + "resolved": "https://registry.npmjs.org/@emnapi/wasi-threads/-/wasi-threads-1.2.2.tgz", + "integrity": "sha512-c95qOXkHdydNKhscBTebqEC1CVAZpyqOfVfBzQ1qgzyl3gfeldUjIggDbIZgDKsHLgnsM+igH7TJ/eAasaVuMA==", "dev": true, "license": "MIT", "optional": true, + "peer": true, "dependencies": { "tslib": "^2.4.0" } @@ -4098,7 +4122,6 @@ "resolved": "https://registry.npmjs.org/@opentelemetry/api/-/api-1.9.1.tgz", "integrity": "sha512-gLyJlPHPZYdAk1JENA9LeHejZe1Ti77/pTeFm/nMXmQH/HFZlcS/O2XJB+L8fkbrNSqhdtlvjBVjxwUYanNH5Q==", "license": "Apache-2.0", - "peer": true, "engines": { "node": ">=8.0.0" } @@ -4932,6 +4955,40 @@ "node": "^20.19.0 || >=22.12.0" } }, + "node_modules/@rolldown/binding-wasm32-wasi/node_modules/@emnapi/core": { + "version": "1.10.0", + "resolved": "https://registry.npmjs.org/@emnapi/core/-/core-1.10.0.tgz", + "integrity": "sha512-yq6OkJ4p82CAfPl0u9mQebQHKPJkY7WrIuk205cTYnYe+k2Z8YBh11FrbRG/H6ihirqcacOgl2BIO8oyMQLeXw==", + "dev": true, + "license": "MIT", + "optional": true, + "dependencies": { + "@emnapi/wasi-threads": "1.2.1", + "tslib": "^2.4.0" + } + }, + "node_modules/@rolldown/binding-wasm32-wasi/node_modules/@emnapi/runtime": { + "version": "1.10.0", + "resolved": "https://registry.npmjs.org/@emnapi/runtime/-/runtime-1.10.0.tgz", + "integrity": "sha512-ewvYlk86xUoGI0zQRNq/mC+16R1QeDlKQy21Ki3oSYXNgLb45GV1P6A0M+/s6nyCuNDqe5VpaY84BzXGwVbwFA==", + "dev": true, + "license": "MIT", + "optional": true, + "dependencies": { + "tslib": "^2.4.0" + } + }, + "node_modules/@rolldown/binding-wasm32-wasi/node_modules/@emnapi/wasi-threads": { + "version": "1.2.1", + "resolved": "https://registry.npmjs.org/@emnapi/wasi-threads/-/wasi-threads-1.2.1.tgz", + "integrity": "sha512-uTII7OYF+/Mes/MrcIOYp5yOtSMLBWSIoLPpcgwipoiKbli6k322tcoFsxoIIxPDqW01SQGAgko4EzZi2BNv2w==", + "dev": true, + "license": "MIT", + "optional": true, + "dependencies": { + "tslib": "^2.4.0" + } + }, "node_modules/@rolldown/binding-win32-arm64-msvc": { "version": "1.0.3", "resolved": "https://registry.npmjs.org/@rolldown/binding-win32-arm64-msvc/-/binding-win32-arm64-msvc-1.0.3.tgz", @@ -5611,7 +5668,6 @@ "integrity": "sha512-A0M6ua6H252bVjPvvtSgl2QA4+ET9S5Mtkb2GDyTxIhH/C4qDItT7RQNO5PhMC6NXGYXOR9dIalcDDgBKT7oFA==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "@typescript-eslint/scope-manager": "8.60.1", "@typescript-eslint/types": "8.60.1", @@ -5955,7 +6011,6 @@ "integrity": "sha512-UVJyE9MttOsBQIDKw1skb9nAwQuR5wuGD3+82K6JgJlm/Y+KI92oNsMNGZCYdDsVtRHSak0pcV5Dno5+4jh9sw==", "dev": true, "license": "MIT", - "peer": true, "bin": { "acorn": "bin/acorn" }, @@ -5990,7 +6045,6 @@ "integrity": "sha512-Ifm/pP/tul1qmAecpbVxCBluVE32rKfjf8gYXH4xI2gCv9mRWFhJMHzkPDM4TXlxwPQYIFegymlsy8lXz7optA==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "fast-deep-equal": "^3.1.3", "fast-uri": "^3.0.1", @@ -6486,7 +6540,6 @@ } ], "license": "MIT", - "peer": true, "dependencies": { "baseline-browser-mapping": "^2.10.12", "caniuse-lite": "^1.0.30001782", @@ -7672,7 +7725,6 @@ "integrity": "sha512-XoMjdBOwe/esVgEvLmNsD3IRHkm7fbKIUGvrleloJXUZgDHig2IPWNniv+GwjyJXzuNqVjlr5+4yVUZjycJwfQ==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "@eslint-community/eslint-utils": "^4.8.0", "@eslint-community/regexpp": "^4.12.1", @@ -8766,7 +8818,6 @@ "resolved": "https://registry.npmjs.org/graphql/-/graphql-16.14.2.tgz", "integrity": "sha512-Chq1s4CY7jmh8gO2qvLIJyfCDIN+EHLFW/9iShnp1z8FjBQMoodWP1kDC36VAMXXIvAjj4ARa7ntfAV2BrjsbA==", "license": "MIT", - "peer": true, "engines": { "node": "^12.22.0 || ^14.16.0 || ^16.0.0 || >=17.0.0" } @@ -12304,7 +12355,6 @@ "version": "2.6.1", "devOptional": true, "license": "MIT", - "peer": true, "bin": { "jiti": "lib/jiti-cli.mjs" } @@ -13260,7 +13310,6 @@ "integrity": "sha512-do+2UsEKRVT70W/QqP2F2sju2x4p2xZo+5/azXqKjXgTk2jfmzsLjzwW0YI8CBEjy4ZUdU8EunXocXXwJdCrtw==", "dev": true, "license": "MIT", - "peer": true, "funding": { "type": "opencollective", "url": "https://opencollective.com/mobx" @@ -14407,7 +14456,6 @@ "devOptional": true, "hasInstallScript": true, "license": "Apache-2.0", - "peer": true, "dependencies": { "@prisma/config": "6.19.3", "@prisma/engines": "6.19.3" @@ -14698,7 +14746,6 @@ "integrity": "sha512-HNe9WslTbXmFK8o8cmwgAeJFSBvt1bPdHCVKtaaV+WlAN36mpT4hcRpwbf3fY56ar2oIXzsBpOAiIRHAdY0OlQ==", "dev": true, "license": "MIT", - "peer": true, "engines": { "node": ">=0.10.0" } @@ -14709,7 +14756,6 @@ "integrity": "sha512-t0BRVXvbiE/o20Hfw669rLbMCDWtYZLvmJigy2f0MxsXF+71pxhR3xOkspmsO8h3ZlNzyibAmtCa3l4lYKk6gQ==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "scheduler": "^0.27.0" }, @@ -15781,7 +15827,6 @@ "integrity": "sha512-ADu2dF53esUzzM4I0ewxhxFtsDd6v4V6dNkg3vG0iFKhnt06sJneTZnRvujAosZwW0XD58IKgGMQoqri4wHRqg==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "@emotion/is-prop-valid": "1.4.0", "css-to-react-native": "3.2.0", @@ -16248,7 +16293,6 @@ "integrity": "sha512-X8EX+XV4QR5xCsrgxaED954zTDfY8KqlDtskKEL0cHhyS/P8b4IFOvGDQpsC9Q1XnLq915wEfwwY/zzskCtmhg==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "esbuild": "~0.28.0" }, @@ -16342,7 +16386,6 @@ "version": "5.9.3", "devOptional": true, "license": "Apache-2.0", - "peer": true, "bin": { "tsc": "bin/tsc", "tsserver": "bin/tsserver" @@ -16517,7 +16560,6 @@ "integrity": "sha512-h9bXPmJichP5fLmVQo3PyaGSDE2n3aPuomeAlVRm0JLmt4rY6zmPKd59HYI4LNW8oTK7tlTsuC7l/m7awx9Jcw==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "lightningcss": "^1.32.0", "picomatch": "^4.0.4", @@ -16963,7 +17005,6 @@ "resolved": "https://registry.npmjs.org/zod/-/zod-4.4.3.tgz", "integrity": "sha512-ytENFjIJFl2UwYglde2jchW2Hwm4GJFLDiSXWdTrJQBIN9Fcyp7n4DhxJEiWNAJMV1/BqWfW/kkg71UDcHJyTQ==", "license": "MIT", - "peer": true, "funding": { "url": "https://github.com/sponsors/colinhacks" } diff --git a/src/app-bootstrap.ts b/src/app-bootstrap.ts index b1bc842..424676f 100644 --- a/src/app-bootstrap.ts +++ b/src/app-bootstrap.ts @@ -19,7 +19,7 @@ import { orgAnalyticsRouter } from './routes/orgAnalytics.js' import { orgMembersRouter } from './routes/orgMembers.js' import { adminRouter } from './routes/admin.js' import { adminVerifiersRouter } from './routes/adminVerifiers.js' -import { adminWebhooksRouter } from './routes/adminWebhooks.js' +import { adminWebhooksRouter, adminVaultReplayRouter } from './routes/adminWebhooks.js' import { verificationsRouter } from './routes/verifications.js' import { apiKeysRouter } from './routes/apiKeys.js' import { notificationsRouter } from './routes/notifications.js' @@ -74,6 +74,7 @@ export function bootstrapApp(options: BootstrapOptions = {}) { app.use('/api/admin', adminRouter) app.use('/api/admin/verifiers', adminVerifiersRouter) app.use('/api/admin/webhooks', adminWebhooksRouter) + app.use('/api/admin/vaults', adminVaultReplayRouter) app.use('/api/verifications', verificationsRouter) app.use('/api/api-keys', apiKeysRouter) app.use('/api/notifications', notificationsRouter) diff --git a/src/routes/adminWebhooks.ts b/src/routes/adminWebhooks.ts index 74a4ba3..fa0ac65 100644 --- a/src/routes/adminWebhooks.ts +++ b/src/routes/adminWebhooks.ts @@ -4,6 +4,8 @@ import { requireAdmin } from '../middleware/rbac.js' import { UserRole } from '../types/user.js' import { createAuditLog } from '../lib/audit-logs.js' import { db } from '../db/knex.js' +import { replayForVault } from '../services/outboxRelay.js' +import { strictRateLimiter } from '../middleware/rateLimiter.js' import { replayDeadLetter, upsertSubscriber, @@ -421,3 +423,40 @@ adminWebhooksRouter.patch( } }, ) + +export const adminVaultReplayRouter = Router() + +adminVaultReplayRouter.use(authenticate) +adminVaultReplayRouter.use(requireAdmin) +adminVaultReplayRouter.use(strictRateLimiter) + +adminVaultReplayRouter.post('/:id/replay-events', async (req: Request, res: Response) => { + try { + const vaultId = req.params.id + const { subscriber_id } = req.body ?? {} + + if (subscriber_id && typeof subscriber_id !== 'string') { + res.status(400).json({ error: 'subscriber_id must be a string' }) + return + } + + const replayedCount = await replayForVault(vaultId, subscriber_id) + + createAuditLog({ + actor_user_id: req.user!.userId, + action: 'vault.outbox.replay', + target_type: 'vault', + target_id: vaultId, + metadata: { + subscriberId: subscriber_id, + replayedCount, + }, + }) + + res.status(200).json({ replayed: true, count: replayedCount }) + } catch (error) { + console.error('Error replaying vault outbox events:', error) + res.status(500).json({ error: 'Failed to replay vault events' }) + } +}) + diff --git a/src/services/outboxRelay.ts b/src/services/outboxRelay.ts index 2898bae..1cebc68 100644 --- a/src/services/outboxRelay.ts +++ b/src/services/outboxRelay.ts @@ -88,3 +88,22 @@ export async function relayOutboxBatch(batchSize = 50): Promise { return rows.length }) } + +/** + * Replays all recorded outbox events for a single vault to an optional target subscriber. + * Preserves the original event ordering (by created_at or id asc) and does not modify the outbox state. + * Returns the number of events replayed. + */ +export async function replayForVault(vaultId: string, subscriberId?: string): Promise { + const rows = await db('vault_outbox') + .whereRaw("payload->'data'->>'vaultId' = ?", [vaultId]) + .orderBy('created_at', 'asc') + + for (const row of rows) { + const payload = typeof row.payload === 'string' ? JSON.parse(row.payload) : row.payload + await dispatchWebhookEvent(payload, subscriberId) + } + + return rows.length +} + diff --git a/src/services/webhooks.ts b/src/services/webhooks.ts index dfd1f29..5f4a586 100644 --- a/src/services/webhooks.ts +++ b/src/services/webhooks.ts @@ -713,8 +713,20 @@ const deliverOnce = async ( */ export const dispatchWebhookEvent = async ( payload: WebhookDeliveryPayload, + targetSubscriberId?: string, ): Promise => { - const eligible = await repo.findByEvent(payload.organizationId, payload.eventType) + let eligible: WebhookSubscriber[] + if (targetSubscriberId) { + const sub = await repo.findById(targetSubscriberId) + const matchesEvent = sub && (sub.events.length === 0 || sub.events.includes(payload.eventType)) + if (!sub || sub.organizationId !== payload.organizationId || !sub.active || !matchesEvent) { + eligible = [] + } else { + eligible = [sub] + } + } else { + eligible = await repo.findByEvent(payload.organizationId, payload.eventType) + } const config = getCircuitBreakerConfig() // Load org allowlist once for the whole dispatch batch (defense in depth) diff --git a/src/tests/outboxRelay.vaultReplay.test.ts b/src/tests/outboxRelay.vaultReplay.test.ts new file mode 100644 index 0000000..607f387 --- /dev/null +++ b/src/tests/outboxRelay.vaultReplay.test.ts @@ -0,0 +1,211 @@ +import { jest, describe, it, expect, beforeEach } from '@jest/globals' +import express from 'express' +import request from 'supertest' + +// ── Declare mocks BEFORE any imports ────────────────────────────────────────── +const createAuditLog = jest.fn() +jest.unstable_mockModule('../lib/audit-logs.js', () => ({ + createAuditLog, + getAuditLogById: jest.fn(), + listAuditLogs: jest.fn(), +})) + +const mockDbQuery = { + whereRaw: jest.fn().mockReturnThis(), + orderBy: jest.fn().mockImplementation(async () => []), +} + +const mockDb = jest.fn((tableName: string) => { + if (tableName === 'vault_outbox') { + return mockDbQuery + } + return mockDbQuery +}) + +jest.unstable_mockModule('../db/index.js', () => ({ + db: mockDb, +})) + +jest.unstable_mockModule('../db/knex.js', () => ({ + db: mockDb, +})) + +const dispatchWebhookEvent = jest.fn(async () => []) +jest.unstable_mockModule('../services/webhooks.js', () => ({ + dispatchWebhookEvent, + replayDeadLetter: jest.fn(), + upsertSubscriber: jest.fn(), + rotateSubscriberSecret: jest.fn(), + listSubscribers: jest.fn(async () => []), + addEgressAllowlistEntry: jest.fn(), + removeEgressAllowlistEntry: jest.fn(), + listEgressAllowlist: jest.fn(), + updateSubscriberFieldPolicy: jest.fn(), +})) + +// Mock the rate limiter to not block tests, but we can verify it is configured +jest.unstable_mockModule('../middleware/rateLimiter.js', () => ({ + strictRateLimiter: (req: any, res: any, next: any) => next(), +})) + +jest.unstable_mockModule('../middleware/auth.js', () => ({ + authenticate: jest.fn((req: any, res: any, next: any) => { + const auth = req.headers.authorization ?? '' + if (!auth.startsWith('Bearer ')) { + return res.status(401).json({ error: 'Unauthorized' }) + } + const token = auth.slice(7) + if (token === 'admin') { + req.user = { userId: 'admin-1', role: 'ADMIN' } + return next() + } + if (token === 'user') { + req.user = { userId: 'user-1', role: 'USER' } + return next() + } + return res.status(401).json({ error: 'Unauthorized' }) + }), +})) + +// ── Import modules after mocks apply ────────────────────────────────────────── +const { replayForVault } = await import('../services/outboxRelay.js') +const { adminVaultReplayRouter } = await import('../routes/adminWebhooks.js') + +const app = express() +app.use(express.json()) +app.use('/api/admin/vaults', adminVaultReplayRouter) + +describe('Vault Outbox Replay', () => { + beforeEach(() => { + jest.clearAllMocks() + }) + + describe('replayForVault service', () => { + it('returns 0 when vault has no events in the outbox', async () => { + mockDbQuery.whereRaw.mockReturnThis() + mockDbQuery.orderBy.mockResolvedValueOnce([]) + + const count = await replayForVault('vault-123') + expect(count).toBe(0) + expect(mockDbQuery.whereRaw).toHaveBeenCalledWith("payload->'data'->>'vaultId' = ?", ['vault-123']) + expect(mockDbQuery.orderBy).toHaveBeenCalledWith('created_at', 'asc') + expect(dispatchWebhookEvent).not.toHaveBeenCalled() + }) + + it('replays all events in order and returns the count', async () => { + const mockEvents = [ + { + id: 1, + payload: JSON.stringify({ + eventId: 'evt-1', + eventType: 'vault_created', + data: { vaultId: 'vault-123' }, + organizationId: 'org-1', + }), + }, + { + id: 2, + payload: { + eventId: 'evt-2', + eventType: 'vault_completed', + data: { vaultId: 'vault-123' }, + organizationId: 'org-1', + }, + }, + ] + + mockDbQuery.whereRaw.mockReturnThis() + mockDbQuery.orderBy.mockResolvedValueOnce(mockEvents) + + const count = await replayForVault('vault-123', 'sub-999') + expect(count).toBe(2) + expect(dispatchWebhookEvent).toHaveBeenCalledTimes(2) + expect(dispatchWebhookEvent).toHaveBeenNthCalledWith( + 1, + { + eventId: 'evt-1', + eventType: 'vault_created', + data: { vaultId: 'vault-123' }, + organizationId: 'org-1', + }, + 'sub-999', + ) + expect(dispatchWebhookEvent).toHaveBeenNthCalledWith( + 2, + { + eventId: 'evt-2', + eventType: 'vault_completed', + data: { vaultId: 'vault-123' }, + organizationId: 'org-1', + }, + 'sub-999', + ) + }) + }) + + describe('POST /api/admin/vaults/:id/replay-events route', () => { + it('requires admin role and returns 403 for non-admin user', async () => { + const res = await request(app) + .post('/api/admin/vaults/vault-123/replay-events') + .set('Authorization', 'Bearer user') + .send() + + expect(res.status).toBe(403) + expect(createAuditLog).not.toHaveBeenCalled() + }) + + it('requires authentication and returns 401 for unauthenticated request', async () => { + const res = await request(app) + .post('/api/admin/vaults/vault-123/replay-events') + .send() + + expect(res.status).toBe(401) + expect(createAuditLog).not.toHaveBeenCalled() + }) + + it('successfully triggers replay and writes audit log for admin', async () => { + mockDbQuery.whereRaw.mockReturnThis() + mockDbQuery.orderBy.mockResolvedValueOnce([ + { + id: 1, + payload: { + eventId: 'evt-1', + eventType: 'vault_created', + data: { vaultId: 'vault-123' }, + organizationId: 'org-1', + }, + }, + ]) + + const res = await request(app) + .post('/api/admin/vaults/vault-123/replay-events') + .set('Authorization', 'Bearer admin') + .send({ subscriber_id: 'sub-999' }) + + expect(res.status).toBe(200) + expect(res.body).toEqual({ replayed: true, count: 1 }) + + expect(createAuditLog).toHaveBeenCalledWith({ + actor_user_id: 'admin-1', + action: 'vault.outbox.replay', + target_type: 'vault', + target_id: 'vault-123', + metadata: { + subscriberId: 'sub-999', + replayedCount: 1, + }, + }) + }) + + it('returns 400 if subscriber_id is not a string', async () => { + const res = await request(app) + .post('/api/admin/vaults/vault-123/replay-events') + .set('Authorization', 'Bearer admin') + .send({ subscriber_id: 12345 }) + + expect(res.status).toBe(400) + expect(res.body).toEqual({ error: 'subscriber_id must be a string' }) + expect(createAuditLog).not.toHaveBeenCalled() + }) + }) +})