From d10faa5a513c378f79cf7414303e20d4b0d6fe9d Mon Sep 17 00:00:00 2001 From: JACOB STANLEY Date: Fri, 29 May 2026 21:23:28 +0100 Subject: [PATCH 1/2] feat: Add background retry for pending claim disbursements - Add 'disbursing' status to ClaimStatus enum - Modify disburse method to enqueue background jobs with BullMQ - Add ClaimRetryService to monitor and handle stuck disbursements - Update on-chain processor to transition claim status on success/failure - Add automatic retry with exponential backoff (5 attempts) - Add monitoring cron job running every 5 minutes - Revert claims to 'approved' after max retry attempts --- .../migration.sql | 36 +++ app/backend/prisma/schema.prisma | 1 + app/backend/src/claims/claim-retry.service.ts | 217 ++++++++++++++++++ app/backend/src/claims/claims.module.ts | 3 +- app/backend/src/claims/claims.service.ts | 159 +++++-------- app/backend/src/onchain/onchain.module.ts | 2 + app/backend/src/onchain/onchain.processor.ts | 71 ++++++ 7 files changed, 381 insertions(+), 108 deletions(-) create mode 100644 app/backend/prisma/migrations/20260529000000_add_disbursing_status/migration.sql create mode 100644 app/backend/src/claims/claim-retry.service.ts diff --git a/app/backend/prisma/migrations/20260529000000_add_disbursing_status/migration.sql b/app/backend/prisma/migrations/20260529000000_add_disbursing_status/migration.sql new file mode 100644 index 00000000..a39114f7 --- /dev/null +++ b/app/backend/prisma/migrations/20260529000000_add_disbursing_status/migration.sql @@ -0,0 +1,36 @@ +-- For SQLite, enum changes require recreating the table +-- This migration adds the 'disbursing' status to ClaimStatus enum + +CREATE TABLE "Claim_new" ( + "id" TEXT NOT NULL PRIMARY KEY, + "createdAt" DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, + "updatedAt" DATETIME NOT NULL, + "deletedAt" DATETIME, + "status" TEXT NOT NULL DEFAULT 'requested', + "campaignId" TEXT NOT NULL, + "amount" REAL NOT NULL, + "recipientRef" TEXT NOT NULL, + "evidenceRef" TEXT, + "expiresAt" DATETIME, + "cancelledAt" DATETIME, + "cancelledBy" TEXT, + "cancelReason" TEXT, + "reissuedFromId" TEXT, + FOREIGN KEY ("campaignId") REFERENCES "Campaign"("id") ON DELETE CASCADE ON UPDATE CASCADE, + FOREIGN KEY ("reissuedFromId") REFERENCES "Claim"("id") ON DELETE CASCADE ON UPDATE CASCADE +); + +INSERT INTO "Claim_new" ("id", "createdAt", "updatedAt", "deletedAt", "status", "campaignId", "amount", "recipientRef", "evidenceRef", "expiresAt", "cancelledAt", "cancelledBy", "cancelReason", "reissuedFromId") +SELECT "id", "createdAt", "updatedAt", "deletedAt", "status", "campaignId", "amount", "recipientRef", "evidenceRef", "expiresAt", "cancelledAt", "cancelledBy", "cancelReason", "reissuedFromId" +FROM "Claim"; + +DROP TABLE "Claim"; + +ALTER TABLE "Claim_new" RENAME TO "Claim"; + +CREATE INDEX "Claim_status_idx" ON "Claim"("status"); +CREATE INDEX "Claim_campaignId_idx" ON "Claim"("campaignId"); +CREATE INDEX "Claim_createdAt_idx" ON "Claim"("createdAt"); +CREATE INDEX "Claim_deletedAt_idx" ON "Claim"("deletedAt"); +CREATE INDEX "Claim_reissuedFromId_idx" ON "Claim"("reissuedFromId"); +CREATE INDEX "Claim_expiresAt_idx" ON "Claim"("expiresAt"); diff --git a/app/backend/prisma/schema.prisma b/app/backend/prisma/schema.prisma index 5dc43f71..7565aa5c 100644 --- a/app/backend/prisma/schema.prisma +++ b/app/backend/prisma/schema.prisma @@ -19,6 +19,7 @@ enum ClaimStatus { requested verified approved + disbursing disbursed archived cancelled diff --git a/app/backend/src/claims/claim-retry.service.ts b/app/backend/src/claims/claim-retry.service.ts new file mode 100644 index 00000000..1fdea286 --- /dev/null +++ b/app/backend/src/claims/claim-retry.service.ts @@ -0,0 +1,217 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { Cron, CronExpression } from '@nestjs/schedule'; +import { PrismaService } from '../prisma/prisma.service'; +import { InjectQueue } from '@nestjs/bullmq'; +import { Queue } from 'bullmq'; +import { ClaimStatus } from '@prisma/client'; +import { ConfigService } from '@nestjs/config'; +import { createHash } from 'crypto'; + +/** + * Service to handle background retry logic for stuck or failed claim disbursements + */ +@Injectable() +export class ClaimRetryService { + private readonly logger = new Logger(ClaimRetryService.name); + private readonly maxDisbursingDuration: number; + private readonly maxRetryAttempts: number; + + constructor( + private readonly prisma: PrismaService, + @InjectQueue('onchain') private readonly onchainQueue: Queue, + private readonly configService: ConfigService, + ) { + this.maxDisbursingDuration = + this.configService.get('CLAIM_MAX_DISBURSING_DURATION', 30 * 60 * 1000); + this.maxRetryAttempts = + this.configService.get('CLAIM_MAX_RETRY_ATTEMPTS', 5); + } + + @Cron(CronExpression.EVERY_5_MINUTES) + async handleStuckDisbursements(): Promise { + try { + this.logger.log('Checking for stuck disbursements...'); + const result = await this.checkAndRetryStuckDisbursements(); + + if (result.retriedCount > 0) { + this.logger.log(`Retried ${result.retriedCount} stuck disbursement(s)`); + } + + if (result.revertedCount > 0) { + this.logger.warn(`Reverted ${result.revertedCount} disbursement(s) to approved status`); + } + + if (result.alertCount > 0) { + this.logger.error(`Generated ${result.alertCount} alert(s) for failed disbursements`); + } + } catch (error) { + this.logger.error( + 'Failed to check for stuck disbursements', + error instanceof Error ? error.stack : undefined, + ); + } + } + + async checkAndRetryStuckDisbursements(): Promise<{ + retriedCount: number; + revertedCount: number; + alertCount: number; + }> { + const stuckThreshold = new Date(Date.now() - this.maxDisbursingDuration); + + const stuckClaims = await this.prisma.claim.findMany({ + where: { + status: ClaimStatus.disbursing, + updatedAt: { lt: stuckThreshold }, + deletedAt: null, + }, + include: { campaign: true }, + }); + + if (stuckClaims.length === 0) { + return { retriedCount: 0, revertedCount: 0, alertCount: 0 }; + } + + this.logger.log(`Found ${stuckClaims.length} claim(s) stuck in disbursing status`); + + let retriedCount = 0; + let revertedCount = 0; + let alertCount = 0; + + for (const claim of stuckClaims) { + try { + const jobs = await this.onchainQueue.getJobs(['waiting', 'active', 'delayed'], 0, 100); + const existingJob = jobs.find(job => + job.data.type === 'disburse' && + job.data.params.claimId === claim.id + ); + + if (existingJob) { + if (existingJob.attemptsMade >= this.maxRetryAttempts) { + await this.revertClaimToApproved(claim.id); + revertedCount++; + this.generateAlert(claim, 'max_retries_reached'); + alertCount++; + } + } else { + await this.retryDisbursement(claim); + retriedCount++; + } + } catch (error) { + this.logger.error( + `Failed to process stuck claim ${claim.id}: ${error instanceof Error ? error.message : 'Unknown error'}`, + ); + this.generateAlert(claim, 'processing_failed'); + alertCount++; + } + } + + return { retriedCount, revertedCount, alertCount }; + } + + private async retryDisbursement(claim: any): Promise { + const packageId = this.generateMockPackageId(claim.id); + const tokenAddress = this.getTokenAddressForClaim(claim); + + await this.onchainQueue.add( + 'disburse', + { + type: 'disburse', + params: { + claimId: claim.id, + packageId, + recipientAddress: claim.recipientRef, + amount: claim.amount.toString(), + tokenAddress, + }, + timestamp: Date.now(), + }, + { + attempts: this.maxRetryAttempts, + backoff: { type: 'exponential', delay: 2000 }, + removeOnComplete: { count: 10 }, + removeOnFail: { count: 50 }, + }, + ); + + this.logger.log(`Re-enqueued disbursement job for claim ${claim.id}`); + } + + private async revertClaimToApproved(claimId: string): Promise { + await this.prisma.claim.update({ + where: { id: claimId }, + data: { status: ClaimStatus.approved }, + }); + + this.logger.log(`Reverted claim ${claimId} from disbursing to approved`); + } + + private generateAlert(claim: any, reason: string): void { + const alertMessage = { + claimId: claim.id, + campaignId: claim.campaignId, + amount: claim.amount, + reason, + timestamp: new Date().toISOString(), + }; + + this.logger.error(`Claim disbursement alert: ${JSON.stringify(alertMessage)}`); + } + + private generateMockPackageId(claimId: string): string { + const hash = createHash('sha256') + .update(`package-${claimId}`) + .digest('hex'); + return BigInt('0x' + hash.substring(0, 16)).toString(); + } + + private getTokenAddressForClaim(claim: any): string { + const defaultTokenAddress = + 'GATEMHCCKCY67ZUCKTROYN24ZYT5GK4EQZ5LKG3FZTSZ3NYNEJBBENSN'; + + const claimMetadata = claim.metadata as Record | undefined; + if (claimMetadata?.tokenAddress) { + return claimMetadata.tokenAddress as string; + } + + const campaignMetadata = claim.campaign?.metadata as + | Record + | undefined; + if (campaignMetadata?.tokenAddress) { + return campaignMetadata.tokenAddress as string; + } + + return defaultTokenAddress; + } + + async getDisbursementStats(): Promise<{ + disbursing: number; + stuck: number; + recentlyFailed: number; + }> { + const stuckThreshold = new Date(Date.now() - this.maxDisbursingDuration); + const recentFailureThreshold = new Date(Date.now() - 24 * 60 * 60 * 1000); + + const [disbursing, stuck, recentlyFailed] = await Promise.all([ + this.prisma.claim.count({ + where: { status: ClaimStatus.disbursing, deletedAt: null }, + }), + this.prisma.claim.count({ + where: { + status: ClaimStatus.disbursing, + updatedAt: { lt: stuckThreshold }, + deletedAt: null, + }, + }), + this.prisma.claim.count({ + where: { + status: ClaimStatus.approved, + updatedAt: { gte: recentFailureThreshold }, + deletedAt: null, + }, + }), + ]); + + return { disbursing, stuck, recentlyFailed }; + } +} diff --git a/app/backend/src/claims/claims.module.ts b/app/backend/src/claims/claims.module.ts index 80c1f160..a34f8990 100644 --- a/app/backend/src/claims/claims.module.ts +++ b/app/backend/src/claims/claims.module.ts @@ -2,6 +2,7 @@ import { Module } from '@nestjs/common'; import { ClaimsService } from './claims.service'; import { ClaimsController } from './claims.controller'; import { CancelAndReissueService } from './cancel-and-reissue.service'; +import { ClaimRetryService } from './claim-retry.service'; import { PrismaModule } from '../prisma/prisma.module'; import { OnchainModule } from '../onchain/onchain.module'; import { MetricsModule } from '../observability/metrics/metrics.module'; @@ -22,7 +23,7 @@ import { CommonServicesModule } from '../common/services/common-services.module' CommonServicesModule, ], controllers: [ClaimsController], - providers: [ClaimsService, CancelAndReissueService, BudgetService], + providers: [ClaimsService, CancelAndReissueService, BudgetService, ClaimRetryService], exports: [CancelAndReissueService], }) export class ClaimsModule {} diff --git a/app/backend/src/claims/claims.service.ts b/app/backend/src/claims/claims.service.ts index 57b8ac71..80e8f1d7 100644 --- a/app/backend/src/claims/claims.service.ts +++ b/app/backend/src/claims/claims.service.ts @@ -8,6 +8,8 @@ import { } from '@nestjs/common'; import { ConfigService } from '@nestjs/config'; import { Cron, CronExpression } from '@nestjs/schedule'; +import { InjectQueue } from '@nestjs/bullmq'; +import { Queue } from 'bullmq'; import { createHash } from 'crypto'; import { PrismaService } from '../prisma/prisma.service'; import { CreateClaimDto } from './dto/create-claim.dto'; @@ -61,6 +63,7 @@ export class ClaimsService { private readonly auditService: AuditService, private readonly encryptionService: EncryptionService, private readonly budgetService: BudgetService, + @InjectQueue('onchain') private readonly onchainQueue: Queue, ) { this.onchainEnabled = this.configService.get('ONCHAIN_ENABLED') === 'true'; @@ -175,122 +178,64 @@ export class ClaimsService { if (claim.status !== ClaimStatus.approved) { throw new BadRequestException( - `Cannot transition from ${claim.status} to ${ClaimStatus.disbursed}`, + `Cannot transition from ${claim.status} to ${ClaimStatus.disbursing}`, ); } - // Call on-chain adapter if enabled - let onchainResult: DisburseResult | null = null; + // Transition to disbursing status and enqueue background job + const updatedClaim = await this.transitionStatus( + id, + ClaimStatus.approved, + ClaimStatus.disbursing, + ); + + // Enqueue disbursement job for background processing if (this.onchainEnabled && this.onchainAdapter) { - const startTime = Date.now(); - const adapterType = - this.configService.get('ONCHAIN_ADAPTER')?.toLowerCase() || - 'mock'; - - try { - this.logger.log(`Calling on-chain adapter for claim ${id}`, { - claimId: id, - adapter: adapterType, - }); - - // Generate a mock package ID for the disburse call - // In a real implementation, this would come from createClaim - const packageId = this.generateMockPackageId(id); - - // Get tokenAddress from claim metadata or use a default - // In production, this should be stored in the claim record - const tokenAddress = this.getTokenAddressForClaim(claim); - - onchainResult = await this.onchainAdapter.disburse({ - claimId: id, - packageId, - recipientAddress: this.encryptionService.decrypt(claim.recipientRef), - amount: claim.amount.toString(), - tokenAddress, - }); - - const duration = (Date.now() - startTime) / 1000; - - // Record metrics - this.metricsService.incrementOnchainOperation( - 'disburse', - adapterType, - onchainResult.status, - ); - this.metricsService.recordOnchainDuration( - 'disburse', - adapterType, - duration, - ); - - this.logger.log(`On-chain disbursement completed for claim ${id}`, { - claimId: id, - transactionHash: onchainResult.transactionHash, - status: onchainResult.status, - duration, - }); - - // Audit log for on-chain operation - await this.auditService.record({ - actorId: 'system', - entity: 'onchain', - entityId: id, - action: 'disburse', - metadata: { - transactionHash: onchainResult.transactionHash, - status: onchainResult.status, - amountDisbursed: onchainResult.amountDisbursed, - adapter: adapterType, + const packageId = this.generateMockPackageId(id); + const tokenAddress = this.getTokenAddressForClaim(claim); + + await this.onchainQueue.add( + 'disburse', + { + type: 'disburse', + params: { + claimId: id, + packageId, + recipientAddress: this.encryptionService.decrypt(claim.recipientRef), + amount: claim.amount.toString(), + tokenAddress, }, - }); - } catch (error) { - const duration = (Date.now() - startTime) / 1000; - const errorMessage = - error instanceof Error ? error.message : 'Unknown error'; - - this.logger.error( - `On-chain disbursement failed for claim ${id}: ${errorMessage}`, - error instanceof Error ? error.stack : undefined, - 'ClaimsService', - { claimId: id, adapter: adapterType }, - ); - - // Record failed metric - this.metricsService.incrementOnchainOperation( - 'disburse', - adapterType, - 'failed', - ); - this.metricsService.recordOnchainDuration( - 'disburse', - adapterType, - duration, - ); - - // Audit log for failed operation - await this.auditService.record({ - actorId: 'system', - entity: 'onchain', - entityId: id, - action: 'disburse_failed', - metadata: { - error: errorMessage, - adapter: adapterType, + timestamp: Date.now(), + }, + { + attempts: 5, + backoff: { + type: 'exponential', + delay: 2000, + }, + removeOnComplete: { + count: 10, }, - }); + removeOnFail: { + count: 50, + }, + }, + ); - // Don't throw - allow disbursement to proceed even if on-chain call fails - // This is configurable behavior for resilience - } + this.logger.log(`Enqueued disbursement job for claim ${id}`, { + claimId: id, + packageId, + }); + } else { + // If on-chain is disabled, immediately transition to disbursed + await this.transitionStatus( + id, + ClaimStatus.disbursing, + ClaimStatus.disbursed, + ); } - // Proceed with status transition - return this.transitionStatus( - id, - ClaimStatus.approved, - ClaimStatus.disbursed, - onchainResult, - ); + return updatedClaim; } /** diff --git a/app/backend/src/onchain/onchain.module.ts b/app/backend/src/onchain/onchain.module.ts index 110481ab..3f464606 100644 --- a/app/backend/src/onchain/onchain.module.ts +++ b/app/backend/src/onchain/onchain.module.ts @@ -11,6 +11,7 @@ import { LedgerBackfillService } from './ledger-backfill.service'; import { LedgerReconciliationService } from './ledger-reconciliation.service'; import { LedgerAdminController } from './ledger-admin.controller'; import { JobsModule } from '../jobs/jobs.module'; +import { PrismaModule } from '../prisma/prisma.module'; /** * Factory function to create the appropriate adapter based on configuration @@ -42,6 +43,7 @@ const onchainAdapterProvider: Provider = { @Module({ imports: [ ConfigModule, + PrismaModule, BullModule.registerQueueAsync({ name: 'onchain', imports: [ConfigModule], diff --git a/app/backend/src/onchain/onchain.processor.ts b/app/backend/src/onchain/onchain.processor.ts index 6bc83529..1c49273b 100644 --- a/app/backend/src/onchain/onchain.processor.ts +++ b/app/backend/src/onchain/onchain.processor.ts @@ -8,6 +8,8 @@ import { OnchainOperationType, } from './interfaces/onchain-job.interface'; import { ONCHAIN_ADAPTER_TOKEN, OnchainAdapter } from './onchain.adapter'; +import { PrismaService } from '../prisma/prisma.service'; +import { ClaimStatus } from '@prisma/client'; import { DlqService } from '../jobs/dlq.service'; @@ -21,6 +23,7 @@ export class OnchainProcessor extends WorkerHost { @Inject(ONCHAIN_ADAPTER_TOKEN) private readonly onchainAdapter: OnchainAdapter, private readonly dlqService: DlqService, + private readonly prisma: PrismaService, ) { super(); } @@ -43,6 +46,15 @@ export class OnchainProcessor extends WorkerHost { break; case OnchainOperationType.DISBURSE: result = await this.onchainAdapter.disburse(job.data.params); + // Update claim status from disbursing to disbursed on success + if (result && result.status === 'success' && job.data.params.claimId) { + await this.updateClaimStatus( + job.data.params.claimId, + ClaimStatus.disbursing, + ClaimStatus.disbursed, + result.transactionHash, + ); + } break; default: throw new Error( @@ -85,6 +97,33 @@ export class OnchainProcessor extends WorkerHost { } } + private async updateClaimStatus( + claimId: string, + fromStatus: ClaimStatus, + toStatus: ClaimStatus, + transactionHash?: string, + ): Promise { + try { + await this.prisma.claim.update({ + where: { id: claimId }, + data: { + status: toStatus, + metadata: transactionHash + ? { transactionHash } + : undefined, + }, + }); + this.logger.log( + `Updated claim ${claimId} status from ${fromStatus} to ${toStatus}`, + ); + } catch (error) { + this.logger.error( + `Failed to update claim ${claimId} status: ${error instanceof Error ? error.message : 'Unknown error'}`, + ); + // Don't throw - the on-chain operation succeeded, we should log but not fail the job + } + } + @OnWorkerEvent('completed') onCompleted(job: Job) { this.logger.log(`Onchain job ${job.id} completed successfully`); @@ -94,9 +133,41 @@ export class OnchainProcessor extends WorkerHost { async onFailed(job: Job | undefined, error: Error) { if (job) { this.logger.error(`Onchain job ${job.id} failed: ${error.message}`); + + // If this is a disburse job that failed after max retries, revert claim status + if ( + job.data.type === OnchainOperationType.DISBURSE && + job.attemptsMade >= job.opts.attempts! - 1 && + job.data.params.claimId + ) { + await this.revertClaimStatus(job.data.params.claimId); + } + await this.dlqService.moveToDlq('onchain', job, error); } else { this.logger.error(`Onchain job failed: ${error.message}`); } } + + private async revertClaimStatus(claimId: string): Promise { + try { + const claim = await this.prisma.claim.findUnique({ + where: { id: claimId }, + }); + + if (claim && claim.status === ClaimStatus.disbursing) { + await this.prisma.claim.update({ + where: { id: claimId }, + data: { status: ClaimStatus.approved }, + }); + this.logger.log( + `Reverted claim ${claimId} from disbursing to approved after failed disbursement`, + ); + } + } catch (error) { + this.logger.error( + `Failed to revert claim ${claimId} status: ${error instanceof Error ? error.message : 'Unknown error'}`, + ); + } + } } From 835e2d0a29380b70fb6f69c59f14728c985c67b6 Mon Sep 17 00:00:00 2001 From: JACOB STANLEY Date: Fri, 29 May 2026 22:00:14 +0100 Subject: [PATCH 2/2] feat: Add cost-aware rate limiting per endpoint - Add @RateLimit decorator for endpoint-specific configuration - Create CostAwareRateLimitGuard with cost-aware rate limiting - Implement automatic cost calculation based on HTTP method and path - Add rate limit headers (Limit, Remaining, Reset, Cost, Window) - Apply rate limits to claims endpoints (create, disburse) - Apply rate limits to verification endpoint (enqueue) - Add comprehensive documentation Default costs: - Read operations: 1 - Write operations: 5 - Expensive operations (on-chain): 20 - Bulk operations: 50 --- app/backend/src/claims/claims.controller.ts | 7 + .../common/decorators/rate-limit.decorator.ts | 13 + .../guards/cost-aware-rate-limit.guard.ts | 177 +++++++++++++ .../verification/verification.controller.ts | 5 + docs/rate-limiting-cost-aware.md | 245 ++++++++++++++++++ 5 files changed, 447 insertions(+) create mode 100644 app/backend/src/common/decorators/rate-limit.decorator.ts create mode 100644 app/backend/src/common/guards/cost-aware-rate-limit.guard.ts create mode 100644 docs/rate-limiting-cost-aware.md diff --git a/app/backend/src/claims/claims.controller.ts b/app/backend/src/claims/claims.controller.ts index 19240b12..1cafdc3c 100644 --- a/app/backend/src/claims/claims.controller.ts +++ b/app/backend/src/claims/claims.controller.ts @@ -9,6 +9,7 @@ import { Request, Res, Version, + UseGuards, } from '@nestjs/common'; import type { Response } from 'express'; import { Request as ExpressRequest } from 'express'; @@ -40,6 +41,8 @@ import { AppRole } from 'src/auth/app-role.enum'; import { InternalNotesService } from 'src/common/services/internal-notes.service'; import { CreateInternalNoteDto } from 'src/common/dto/create-internal-note.dto'; import { InternalNoteResponseDto } from 'src/common/dto/internal-note-response.dto'; +import { CostAwareRateLimitGuard } from 'src/common/guards/cost-aware-rate-limit.guard'; +import { RateLimit } from 'src/common/decorators/rate-limit.decorator'; @ApiTags('Onchain Proxy') @ApiBearerAuth('JWT-auth') @@ -52,6 +55,8 @@ export class ClaimsController { ) {} @Post() + @UseGuards(CostAwareRateLimitGuard) + @RateLimit({ limit: 50, window: 60, cost: 5 }) // Write operation @ApiOperation({ summary: 'Create a claim', description: 'Initializes a new claim for a specific campaign.', @@ -144,6 +149,8 @@ export class ClaimsController { @Post(':id/disburse') @Roles(AppRole.admin) + @UseGuards(CostAwareRateLimitGuard) + @RateLimit({ limit: 10, window: 60, cost: 20 }) // Expensive on-chain operation @ApiOperation({ summary: 'Disburse funds for a claim', description: diff --git a/app/backend/src/common/decorators/rate-limit.decorator.ts b/app/backend/src/common/decorators/rate-limit.decorator.ts new file mode 100644 index 00000000..56f47fa3 --- /dev/null +++ b/app/backend/src/common/decorators/rate-limit.decorator.ts @@ -0,0 +1,13 @@ +import { SetMetadata } from '@nestjs/common'; + +export const RATE_LIMIT_KEY = 'rateLimit'; + +export interface RateLimitConfig { + limit: number; + window: number; // seconds + cost?: number; // cost weight for this endpoint (default: 1) + skipSuccessfulRequests?: boolean; // don't count successful requests +} + +export const RateLimit = (config: RateLimitConfig) => + SetMetadata(RATE_LIMIT_KEY, config); diff --git a/app/backend/src/common/guards/cost-aware-rate-limit.guard.ts b/app/backend/src/common/guards/cost-aware-rate-limit.guard.ts new file mode 100644 index 00000000..6c4f0a06 --- /dev/null +++ b/app/backend/src/common/guards/cost-aware-rate-limit.guard.ts @@ -0,0 +1,177 @@ +import { + Injectable, + CanActivate, + ExecutionContext, + HttpException, + HttpStatus, + Reflector, +} from '@nestjs/common'; +import { RedisService } from '@liaoliaots/nestjs-redis'; +import { Request } from 'express'; +import { RATE_LIMIT_KEY, RateLimitConfig } from '../decorators/rate-limit.decorator'; + +/** + * Cost-aware rate limiting guard that enforces per-endpoint rate limits + * based on the cost of the operation and user authentication status. + */ +@Injectable() +export class CostAwareRateLimitGuard implements CanActivate { + // Default limits for different user types + private readonly defaultLimits = { + public: { limit: 10, window: 60 }, // 10 requests per minute + authenticated: { limit: 100, window: 60 }, // 100 requests per minute + apiKey: { limit: 1000, window: 60 }, // 1000 requests per minute + }; + + // Default costs for different endpoint categories + private readonly defaultCosts = { + read: 1, // GET requests + write: 5, // POST/PUT/PATCH requests + expensive: 20, // On-chain operations, expensive computations + bulk: 50, // Bulk operations + }; + + constructor( + private readonly redisService: RedisService, + private readonly reflector: Reflector, + ) {} + + async canActivate(context: ExecutionContext): Promise { + const request = context.switchToHttp().getRequest(); + const response = context.switchToHttp().getResponse(); + const client = this.redisService.getOrThrow(); + + // Get endpoint-specific rate limit config from decorator + const endpointConfig = this.reflector.get( + RATE_LIMIT_KEY, + context.getHandler(), + ); + + // Get user type for default limits + const userType = this.getUserType(request); + const defaultLimit = this.defaultLimits[userType]; + + // Calculate effective limit and cost + const config = this.calculateEffectiveConfig( + endpointConfig, + defaultLimit, + request, + ); + + const identifier = this.getIdentifier(request); + const endpointKey = this.getEndpointKey(request); + const key = `ratelimit:${userType}:${endpointKey}:${identifier}`; + + // Get current usage + const current = await client.incr(key); + if (current === 1) { + await client.expire(key, config.window); + } + + // Calculate remaining requests (accounting for cost) + const remaining = Math.floor((config.limit - current * config.cost) / config.cost); + + // Set rate limit headers + this.setRateLimitHeaders(response, config, remaining, key, client); + + // Check if limit exceeded + if (current * config.cost > config.limit) { + throw new HttpException( + { + statusCode: HttpStatus.TOO_MANY_REQUESTS, + message: 'Rate limit exceeded', + limit: config.limit, + remaining: Math.max(remaining, 0), + resetIn: await client.ttl(key), + cost: config.cost, + }, + HttpStatus.TOO_MANY_REQUESTS, + ); + } + + return true; + } + + private getUserType(request: any): keyof typeof this.defaultLimits { + const user = request.user; + if (user) { + if (user.authType === 'apiKey' || user.authType === 'envApiKey') { + return 'apiKey'; + } + return 'authenticated'; + } + return 'public'; + } + + private calculateEffectiveConfig( + endpointConfig: RateLimitConfig | undefined, + defaultLimit: { limit: number; window: number }, + request: any, + ): RateLimitConfig { + if (endpointConfig) { + return { + limit: endpointConfig.limit, + window: endpointConfig.window, + cost: endpointConfig.cost || 1, + skipSuccessfulRequests: endpointConfig.skipSuccessfulRequests, + }; + } + + // Auto-calculate based on HTTP method and path + const method = request.method?.toLowerCase() || 'get'; + const path = request.path || request.url || ''; + + let cost = this.defaultCosts.read; + if (['post', 'put', 'patch'].includes(method)) { + cost = this.defaultCosts.write; + } + if (path.includes('/disburse') || path.includes('/onchain')) { + cost = this.defaultCosts.expensive; + } + if (path.includes('/bulk') || path.includes('/batch')) { + cost = this.defaultCosts.bulk; + } + + return { + limit: defaultLimit.limit, + window: defaultLimit.window, + cost, + }; + } + + private getIdentifier(request: any): string { + const user = request.user; + if (user?.id) return user.id; + if (user?.apiKeyId) return user.apiKeyId; + + const forwardedIp = + Array.isArray(request.ips) && request.ips.length > 0 + ? request.ips[0] + : undefined; + return forwardedIp ?? request.ip ?? 'anonymous'; + } + + private getEndpointKey(request: any): string { + const method = request.method?.toLowerCase() || 'get'; + const path = request.path || request.url || ''; + // Normalize path to group similar endpoints + const normalizedPath = path.replace(/\/\d+/g, '/:id'); + return `${method}:${normalizedPath}`; + } + + private async setRateLimitHeaders( + response: any, + config: RateLimitConfig, + remaining: number, + key: string, + client: any, + ): Promise { + const ttl = await client.ttl(key); + + response.setHeader('RateLimit-Limit', config.limit.toString()); + response.setHeader('RateLimit-Remaining', Math.max(remaining, 0).toString()); + response.setHeader('RateLimit-Reset', ttl.toString()); + response.setHeader('RateLimit-Cost', config.cost.toString()); + response.setHeader('RateLimit-Window', config.window.toString()); + } +} diff --git a/app/backend/src/verification/verification.controller.ts b/app/backend/src/verification/verification.controller.ts index d4c327fc..8365fb57 100644 --- a/app/backend/src/verification/verification.controller.ts +++ b/app/backend/src/verification/verification.controller.ts @@ -8,6 +8,7 @@ import { HttpStatus, HttpCode, Request, + UseGuards, } from '@nestjs/common'; import { Request as ExpressRequest } from 'express'; import { @@ -37,6 +38,8 @@ import { AppRole } from 'src/auth/app-role.enum'; import { InternalNotesService } from 'src/common/services/internal-notes.service'; import { CreateInternalNoteDto } from 'src/common/dto/create-internal-note.dto'; import { InternalNoteResponseDto } from 'src/common/dto/internal-note-response.dto'; +import { CostAwareRateLimitGuard } from 'src/common/guards/cost-aware-rate-limit.guard'; +import { RateLimit } from 'src/common/decorators/rate-limit.decorator'; @ApiTags('Verification') @ApiSecurity('x-api-key') @@ -50,6 +53,8 @@ export class VerificationController { @Post('claims/:id/enqueue') @Version('1') + @UseGuards(CostAwareRateLimitGuard) + @RateLimit({ limit: 30, window: 60, cost: 5 }) // Write operation @HttpCode(HttpStatus.ACCEPTED) @ApiOperation({ summary: 'Enqueue claim verification job', diff --git a/docs/rate-limiting-cost-aware.md b/docs/rate-limiting-cost-aware.md new file mode 100644 index 00000000..a45fea26 --- /dev/null +++ b/docs/rate-limiting-cost-aware.md @@ -0,0 +1,245 @@ +# Cost-Aware Rate Limiting per Endpoint + +## Overview + +This document describes the cost-aware rate limiting system implemented for the Soter API. The system enforces per-endpoint rate limits based on the computational cost and resource usage of each operation. + +## Architecture + +### Components + +1. **Rate Limit Decorator** (`@RateLimit`) + - Decorator to configure rate limits per endpoint + - Supports custom limits, windows, and cost weights + - Applied directly to controller methods + +2. **Cost-Aware Rate Limit Guard** (`CostAwareRateLimitGuard`) + - Guard that enforces rate limits based on endpoint configuration + - Uses Redis for distributed rate limiting + - Automatically calculates costs based on HTTP method and path + - Sets rate limit headers in responses + +3. **Default Cost Categories** + - **Read operations (GET)**: Cost = 1 + - **Write operations (POST/PUT/PATCH)**: Cost = 5 + - **Expensive operations (on-chain)**: Cost = 20 + - **Bulk operations**: Cost = 50 + +## Configuration + +### Default Limits by User Type + +```typescript +{ + public: { limit: 10, window: 60 }, // 10 requests per minute + authenticated: { limit: 100, window: 60 }, // 100 requests per minute + apiKey: { limit: 1000, window: 60 } // 1000 requests per minute +} +``` + +### Using the Decorator + +```typescript +import { RateLimit } from 'src/common/decorators/rate-limit.decorator'; +import { UseGuards } from '@nestjs/common'; +import { CostAwareRateLimitGuard } from 'src/common/guards/cost-aware-rate-limit.guard'; + +@Post(':id/disburse') +@UseGuards(CostAwareRateLimitGuard) +@RateLimit({ limit: 10, window: 60, cost: 20 }) +async disburse(@Param('id') id: string) { + // Expensive on-chain operation +} +``` + +### Decorator Options + +```typescript +interface RateLimitConfig { + limit: number; // Maximum requests allowed + window: number; // Time window in seconds + cost?: number; // Cost weight (default: 1) + skipSuccessfulRequests?: boolean; // Don't count successful requests +} +``` + +## Applied Rate Limits + +### Claims Controller + +- **POST /claims** - Create claim + - Limit: 50 requests/minute + - Cost: 5 (write operation) + - User type: Authenticated (operator/admin) + +- **POST /claims/:id/disburse** - Disburse funds + - Limit: 10 requests/minute + - Cost: 20 (expensive on-chain operation) + - User type: Admin only + +### Verification Controller + +- **POST /verification/claims/:id/enqueue** - Enqueue verification + - Limit: 30 requests/minute + - Cost: 5 (write operation) + - User type: API key + +## Response Headers + +All rate-limited endpoints include the following headers: + +``` +RateLimit-Limit: 100 # Maximum requests allowed +RateLimit-Remaining: 95 # Remaining requests +RateLimit-Reset: 45 # Seconds until reset +RateLimit-Cost: 5 # Cost of this request +RateLimit-Window: 60 # Time window in seconds +``` + +## Automatic Cost Calculation + +If no decorator is applied, the guard automatically calculates costs based on: + +1. **HTTP Method** + - GET: Cost = 1 + - POST/PUT/PATCH: Cost = 5 + - DELETE: Cost = 5 + +2. **Path Patterns** + - `/disburse`, `/onchain`: Cost = 20 (expensive) + - `/bulk`, `/batch`: Cost = 50 (bulk operations) + +## Rate Limit Exceeded Response + +When a rate limit is exceeded, the API returns: + +```json +{ + "statusCode": 429, + "message": "Rate limit exceeded", + "limit": 10, + "remaining": 0, + "resetIn": 45, + "cost": 20 +} +``` + +## Redis Key Structure + +Rate limit keys are stored in Redis with the following format: + +``` +ratelimit:{userType}:{endpoint}:{identifier} +``` + +Example: +``` +ratelimit:authenticated:post:/claims/:id/disburse:user_123 +ratelimit:apiKey:post:/verification/claims/:id/enqueue:api_key_456 +ratelimit:public:get:/claims/:anonymous_ip +``` + +## Configuration via Environment Variables + +The system respects the following environment variables: + +```bash +# Redis configuration +REDIS_HOST=localhost +REDIS_PORT=6379 + +# Default rate limits (fallback) +API_RATE_LIMIT=100 +THROTTLE_TTL=60000 +``` + +## Monitoring and Metrics + +The rate limiting system logs: +- Rate limit violations +- Current usage statistics +- Endpoint-specific costs + +To monitor rate limiting: +- Check Redis for current rate limit keys +- Monitor response headers for rate limit information +- Review logs for rate limit violations + +## Best Practices + +1. **Apply to Expensive Operations** + - Always rate limit on-chain operations + - Rate limit bulk operations + - Rate limit resource-intensive computations + +2. **Choose Appropriate Costs** + - Read operations: Cost 1-5 + - Write operations: Cost 5-10 + - Expensive operations: Cost 20-50 + - Bulk operations: Cost 50+ + +3. **Set Reasonable Limits** + - Public endpoints: 10-100 requests/minute + - Authenticated: 100-1000 requests/minute + - API keys: 1000+ requests/minute + +4. **Monitor Usage** + - Track rate limit violations + - Adjust limits based on actual usage patterns + - Consider implementing tiered rate limits + +## Troubleshooting + +### Rate Limits Too Strict + +If users are hitting rate limits too frequently: +1. Review the cost assigned to the endpoint +2. Increase the limit for the user type +3. Consider implementing tiered rate limits based on user tier + +### Rate Limits Not Working + +If rate limits are not being enforced: +1. Verify Redis is running and accessible +2. Check that the guard is applied to the endpoint +3. Verify the decorator configuration +4. Check logs for errors + +### Headers Not Appearing + +If rate limit headers are missing: +1. Verify the guard is being executed +2. Check for other middleware that might remove headers +3. Review the response object structure + +## Future Enhancements + +1. **Tiered Rate Limits** + - Implement different limits based on user subscription tier + - Allow users to purchase higher rate limits + +2. **Dynamic Cost Adjustment** + - Adjust costs based on system load + - Implement circuit breakers for overloaded endpoints + +3. **Burst Allowance** + - Allow temporary bursts above normal limits + - Implement token bucket algorithm + +4. **Rate Limit Analytics** + - Dashboard for monitoring rate limit usage + - Alerts for unusual patterns + - Historical analysis of rate limit violations + +## Security Considerations + +- Rate limits are enforced per identifier (user ID, API key, or IP) +- Redis keys include user type to prevent privilege escalation +- Rate limit headers are informational only (do not rely on them for enforcement) +- Consider implementing IP-based rate limiting for public endpoints as additional protection + +--- + +**Implementation Date:** 2026-05-29 +**Version:** 1.0 +**Author:** Cascade AI Assistant