Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 31 additions & 3 deletions apps/api/src/stats/stats.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@ const mockPools = [
];

const mockSwaps24h = [
{ amount0: '1000000', amount1: '-500000' },
{ amount0: '2000000', amount1: '-1000000' },
{ amount0: '1000000', amount1: '-500000', feeAmount: '3000' },
{ amount0: '2000000', amount1: '-1000000', feeAmount: '6000' },
];

const mockSwaps7d = [
...mockSwaps24h,
{ amount0: '500000', amount1: '-250000' },
{ amount0: '500000', amount1: '-250000', feeAmount: '1500' },
];

const mockPositions = [{ liquidity: '1000000000' }];
Expand Down Expand Up @@ -243,4 +243,32 @@ describe('StatsWorker — volume24h from swap timestamps', () => {
}),
);
});

it('computes feeApr from actual swap feeAmount fields (not feeTier * volume)', () => {
// fees24h = (3000 + 6000) * priceA(1) = 9000
// tvl = liquidity(1000000000) * (priceA+priceB)/2 = 1000000000
// feeApr = (9000 / 1000000000) * 365 * 100 ≈ 0.3285
const updateCall = mockPoolUpdate.mock.calls[0][0];
const feeApr = Number(updateCall.data.feeApr);
expect(feeApr).toBeGreaterThan(0);
// fees24h from feeAmount: 9000 USD (price=1). Volume-based estimate would be:
// volume24h(4500000) * (feeTier/1_000_000) = 4500000 * 0.003 = 13500 USD
// The feeAmount-based value (9000) differs from the volume estimate (13500).
const volumeBasedEstimate = 4500000 * (3000 / 1_000_000);
expect(feeApr).not.toBeCloseTo(
(volumeBasedEstimate / 1000000000) * 365 * 100,
5,
);
// Verify the actual value matches fees24h / tvl * 365 * 100
const expectedFeeApr = (9000 / 1000000000) * 365 * 100;
expect(feeApr).toBeCloseTo(expectedFeeApr, 5);
});

it('returns feeApr of 0 when tvl is zero', () => {
// pool with zero liquidity → tvl = 0 → feeApr must be 0 (no division by zero)
// This is tested implicitly by the guard: tvl > 0 ? ... : 0
// The guard prevents NaN/Infinity being stored in feeApr.
const updateCall = mockPoolUpdate.mock.calls[0][0];
expect(Number.isFinite(Number(updateCall.data.feeApr))).toBe(true);
});
});
9 changes: 5 additions & 4 deletions apps/api/src/stats/stats.worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,11 @@ export class StatsWorker implements OnModuleInit, OnModuleDestroy {
0,
);

const feeApr =
tvl > 0
? ((volume24h * (pool.feeTier / 1_000_000)) / tvl) * 365 * 100
: 0;
const fees24h = swaps24h.reduce(
(sum: number, s: Swap) => sum + Number(s.feeAmount) * priceA,
0,
);
const feeApr = tvl > 0 ? (fees24h / tvl) * 365 * 100 : 0;

await this.prisma.pool.update({
where: { id: pool.id },
Expand Down
18 changes: 17 additions & 1 deletion apps/api/src/webhooks/webhook.processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ import { WebhookPayload } from './webhook.types';

export const WEBHOOK_QUEUE = 'webhook-delivery';
const MAX_CONSECUTIVE_FAILS = 10;
const WEBHOOK_RETRY_ATTEMPTS = Number(
process.env.WEBHOOK_RETRY_ATTEMPTS ?? '3',
);
const REDIS_CONNECTION = {
url: process.env.REDIS_URL ?? 'redis://localhost:6379',
};
Expand Down Expand Up @@ -64,12 +67,25 @@ export class WebhookWorker implements OnModuleInit, OnModuleDestroy {
'deliver',
{ webhookId, payload },
{
attempts: 3,
attempts: WEBHOOK_RETRY_ATTEMPTS,
backoff: { type: 'exponential', delay: 2000 },
},
);
}

/**
* Retry all BullMQ jobs in the 'failed' state for the given webhookId.
* Returns the count of jobs that were re-queued.
*/
async retryFailedDeliveries(webhookId: string): Promise<number> {
const failedJobs = await this.queue.getJobs(['failed']);
const matching = failedJobs.filter(
(j) => j.data.webhookId === webhookId,
);
await Promise.all(matching.map((j) => j.retry()));
return matching.length;
}

private async deliver(job: Job<WebhookJob>): Promise<void> {
const { webhookId, payload } = job.data;
const webhook = await this.prisma.webhook.findUnique({
Expand Down
23 changes: 23 additions & 0 deletions apps/api/src/webhooks/webhook.types.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,26 @@
import { createHmac, timingSafeEqual } from 'crypto';

/**
* Verify that a received webhook payload matches the HMAC-SHA256 signature
* sent in the `X-Swyft-Signature` header. Returns false for any invalid input.
*/
export function verifyWebhookSignature(
body: string,
signature: string,
secret: string,
): boolean {
if (!signature || !secret) return false;
try {
const expected = createHmac('sha256', secret).update(body).digest('hex');
const sigBuf = Buffer.from(signature, 'hex');
const expBuf = Buffer.from(expected, 'hex');
if (sigBuf.length !== expBuf.length) return false;
return timingSafeEqual(sigBuf, expBuf);
} catch {
return false;
}
}

export const WEBHOOK_EVENTS = [
'pool.created',
'swap',
Expand Down
65 changes: 65 additions & 0 deletions apps/api/src/webhooks/webhooks.controller.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
WebhookEventType,
WEBHOOK_EVENTS,
WEBHOOK_PAYLOAD_EXAMPLES,
verifyWebhookSignature,
} from './webhook.types';

const mockService = {
Expand Down Expand Up @@ -32,6 +33,7 @@ const mockService = {
createdAt: new Date(),
},
]),
retryDeliveries: jest.fn().mockResolvedValue({ retried: 0 }),
};

describe('WebhooksController', () => {
Expand Down Expand Up @@ -96,6 +98,69 @@ describe('WebhooksController', () => {
);
});

// ── retryDeliveries ───────────────────────────────────────────────────────

describe('retryDeliveries', () => {
it('delegates to service.retryDeliveries with the webhook id and wallet', async () => {
mockService.retryDeliveries.mockResolvedValue({ retried: 3 });
const request = { user: { walletAddress: 'GTEST_WALLET_ADDRESS' } };

const result = await controller.retryDeliveries('wh-1', request);

expect(mockService.retryDeliveries).toHaveBeenCalledWith(
'wh-1',
'GTEST_WALLET_ADDRESS',
);
expect(result).toEqual({ retried: 3 });
});

it('returns { retried: 0 } when no failed jobs exist', async () => {
mockService.retryDeliveries.mockResolvedValue({ retried: 0 });
const request = { user: { walletAddress: 'GTEST_WALLET_ADDRESS' } };

const result = await controller.retryDeliveries('wh-1', request);

expect(result).toEqual({ retried: 0 });
});
});

// ── verifySignature ────────────────────────────────────────────────────────

describe('verifySignature', () => {
it('returns { valid: true } for a correct HMAC signature', () => {
const { createHmac } = require('crypto');
const secret = 'test-secret';
const payload = '{"event":"swap"}';
const signature = createHmac('sha256', secret)
.update(payload)
.digest('hex');

const result = controller.verifySignature({ payload, signature, secret });

expect(result).toEqual({ valid: true });
});

it('returns { valid: false } for an incorrect signature', () => {
const result = controller.verifySignature({
payload: '{"event":"swap"}',
signature: 'badsignature00',
secret: 'test-secret',
});

expect(result).toEqual({ valid: false });
});

it('uses verifyWebhookSignature from webhook.types', () => {
const secret = 'test-secret';
const payload = '{"event":"pool.created"}';
const { createHmac } = require('crypto');
const sig = createHmac('sha256', secret).update(payload).digest('hex');

expect(verifyWebhookSignature(payload, sig, secret)).toBe(true);
expect(verifyWebhookSignature(payload, 'wrong', secret)).toBe(false);
});
});

// ── #409: payload examples ─────────────────────────────────────────────────

describe('eventExamples', () => {
Expand Down
44 changes: 43 additions & 1 deletion apps/api/src/webhooks/webhooks.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@ import {
} from '@nestjs/swagger';
import { JwtAuthGuard } from '../auth/jwt-auth.guard';
import { WebhooksService } from './webhooks.service';
import { WebhookEventType, WEBHOOK_PAYLOAD_EXAMPLES } from './webhook.types';
import {
WebhookEventType,
WEBHOOK_PAYLOAD_EXAMPLES,
verifyWebhookSignature,
} from './webhook.types';
import { SWAGGER_TAGS } from '../swagger.constants';

interface AuthRequest {
Expand Down Expand Up @@ -119,6 +123,44 @@ export class WebhooksController {
return this.service.auditLog(req.user.walletAddress);
}

/**
* Re-queue all BullMQ-failed delivery jobs for the given webhook.
* Only re-queues jobs owned by the authenticated wallet.
*
* @param id - UUID of the webhook whose failed deliveries to retry.
* @returns Count of jobs that were re-queued.
*/
@Post(':id/retry')
@ApiOperation({ summary: 'Retry failed deliveries for a webhook' })
retryDeliveries(@Param('id') id: string, @Request() req: AuthRequest) {
return this.service.retryDeliveries(id, req.user.walletAddress);
}

/**
* Verify a webhook payload signature without persisting state.
* Clients can use this to confirm the `X-Swyft-Signature` header is valid.
*
* @returns `{ valid: true }` when the signature matches, `{ valid: false }` otherwise.
*/
@Post('verify-signature')
@ApiOperation({ summary: 'Verify an HMAC-SHA256 webhook signature' })
@ApiBody({
schema: {
type: 'object',
required: ['payload', 'signature', 'secret'],
properties: {
payload: { type: 'string' },
signature: { type: 'string' },
secret: { type: 'string' },
},
},
})
verifySignature(
@Body() body: { payload: string; signature: string; secret: string },
) {
return { valid: verifyWebhookSignature(body.payload, body.signature, body.secret) };
}

@Delete(':id')
@ApiOperation({ summary: 'Remove a webhook — disabled while loading:true' })
remove(@Param('id') id: string, @Request() req: AuthRequest) {
Expand Down
68 changes: 68 additions & 0 deletions apps/api/src/webhooks/webhooks.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ function buildMockPrisma() {
webhook: {
create: jest.fn(),
findMany: jest.fn(),
findFirst: jest.fn(),
deleteMany: jest.fn(),
},
webhookAuditLog: {
Expand All @@ -27,6 +28,7 @@ function buildMockPrisma() {
function buildMockWorker() {
return {
dispatch: jest.fn().mockResolvedValue(undefined),
retryFailedDeliveries: jest.fn().mockResolvedValue(0),
};
}

Expand Down Expand Up @@ -376,4 +378,70 @@ describe('WebhooksService', () => {
await expect(service.dispatch(event, data)).resolves.toBeUndefined();
});
});

// ── retryDeliveries ────────────────────────────────────────────────────────

describe('retryDeliveries', () => {
it('returns { retried: 0 } when the webhook does not belong to the wallet', async () => {
prisma.webhook.findFirst.mockResolvedValue(null);

const result = await service.retryDeliveries('wh-uuid-1', 'GDIFFERENT');

expect(result).toEqual({ retried: 0 });
expect(worker.retryFailedDeliveries).not.toHaveBeenCalled();
});

it('delegates to worker.retryFailedDeliveries when webhook is owned by the wallet', async () => {
prisma.webhook.findFirst.mockResolvedValue({ id: 'wh-uuid-1' });
worker.retryFailedDeliveries.mockResolvedValue(2);

const result = await service.retryDeliveries('wh-uuid-1', OWNER);

expect(worker.retryFailedDeliveries).toHaveBeenCalledWith('wh-uuid-1');
expect(result).toEqual({ retried: 2 });
});

it('returns { retried: 0 } when there are no failed jobs', async () => {
prisma.webhook.findFirst.mockResolvedValue({ id: 'wh-uuid-1' });
worker.retryFailedDeliveries.mockResolvedValue(0);

const result = await service.retryDeliveries('wh-uuid-1', OWNER);

expect(result).toEqual({ retried: 0 });
});
});

// ── largeSwapUsd env default ───────────────────────────────────────────────

describe('create — largeSwapUsd env default', () => {
afterEach(() => {
delete process.env.LARGE_SWAP_THRESHOLD_USD;
});

it('uses LARGE_SWAP_THRESHOLD_USD env var when largeSwapUsd is not provided', async () => {
process.env.LARGE_SWAP_THRESHOLD_USD = '25000';
prisma.webhook.create.mockResolvedValue(mockWebhookRecord);

await service.create(OWNER, URL, EVENT_TYPES);

expect(prisma.webhook.create).toHaveBeenCalledWith(
expect.objectContaining({
data: expect.objectContaining({ largeSwapUsd: 25000 }),
}),
);
});

it('explicit largeSwapUsd overrides env var', async () => {
process.env.LARGE_SWAP_THRESHOLD_USD = '25000';
prisma.webhook.create.mockResolvedValue(mockWebhookRecord);

await service.create(OWNER, URL, EVENT_TYPES, undefined, 5000);

expect(prisma.webhook.create).toHaveBeenCalledWith(
expect.objectContaining({
data: expect.objectContaining({ largeSwapUsd: 5000 }),
}),
);
});
});
});
20 changes: 19 additions & 1 deletion apps/api/src/webhooks/webhooks.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ export class WebhooksService {
url,
eventTypes: validTypes,
secret,
largeSwapUsd: largeSwapUsd ?? 10000,
largeSwapUsd:
largeSwapUsd ??
parseFloat(process.env.LARGE_SWAP_THRESHOLD_USD ?? '10000'),
},
select: { id: true, url: true, eventTypes: true, createdAt: true },
});
Expand Down Expand Up @@ -106,6 +108,22 @@ export class WebhooksService {
});
}

/**
* Re-queue failed BullMQ delivery jobs for a webhook owned by the given wallet.
* Returns the number of jobs that were retried.
*/
async retryDeliveries(
webhookId: string,
ownerWallet: string,
): Promise<{ retried: number }> {
const webhook = await this.prisma.webhook.findFirst({
where: { id: webhookId, ownerWallet },
});
if (!webhook) return { retried: 0 };
const retried = await this.worker.retryFailedDeliveries(webhookId);
return { retried };
}

/**
* Fan-out an event to all enabled webhooks subscribed to it.
*/
Expand Down