diff --git a/backend/src/common/common.module.ts b/backend/src/common/common.module.ts index 7010573d..4ca42823 100644 --- a/backend/src/common/common.module.ts +++ b/backend/src/common/common.module.ts @@ -2,7 +2,11 @@ import { Module } from '@nestjs/common'; import { ConfigModule, ConfigService } from '@nestjs/config'; import { JwtModule } from '@nestjs/jwt'; import { PassportModule } from '@nestjs/passport'; +import { TypeOrmModule } from '@nestjs/typeorm'; import { FilteringService } from './filtering.service'; +import { IdempotencyKey } from './idempotency/idempotency-key.entity'; +import { IdempotencyService } from './idempotency/idempotency.service'; +import { IdempotencyInterceptor } from './idempotency/idempotency.interceptor'; @Module({ imports: [ @@ -17,8 +21,9 @@ import { FilteringService } from './filtering.service'; }, }), }), + TypeOrmModule.forFeature([IdempotencyKey]), ], - providers: [FilteringService], - exports: [JwtModule, FilteringService], + providers: [FilteringService, IdempotencyService, IdempotencyInterceptor], + exports: [JwtModule, FilteringService, IdempotencyService, IdempotencyInterceptor], }) export class CommonModule {} diff --git a/backend/src/common/idempotency/idempotency-key.entity.ts b/backend/src/common/idempotency/idempotency-key.entity.ts new file mode 100644 index 00000000..8bf8621f --- /dev/null +++ b/backend/src/common/idempotency/idempotency-key.entity.ts @@ -0,0 +1,37 @@ +import { + Entity, + PrimaryGeneratedColumn, + Column, + CreateDateColumn, + Index, +} from 'typeorm'; + +@Entity('idempotency_keys') +@Index(['key', 'userId'], { unique: true }) +export class IdempotencyKey { + @PrimaryGeneratedColumn('uuid') + id: string; + + @Column({ type: 'varchar', length: 255 }) + key: string; + + @Column({ type: 'uuid' }) + userId: string; + + /** sha256 of method+path+body — detects same-key/different-body reuse */ + @Column({ name: 'request_hash', type: 'varchar', length: 64 }) + request_hash: string; + + /** Null while the original request is still executing */ + @Column({ name: 'status_code', type: 'int', nullable: true }) + status_code: number | null; + + @Column({ name: 'response_body', type: 'jsonb', nullable: true }) + response_body: unknown; + + @Column({ name: 'in_progress', type: 'boolean', default: true }) + in_progress: boolean; + + @CreateDateColumn({ type: 'timestamptz' }) + created_at: Date; +} diff --git a/backend/src/common/idempotency/idempotency.interceptor.ts b/backend/src/common/idempotency/idempotency.interceptor.ts new file mode 100644 index 00000000..8a8a314f --- /dev/null +++ b/backend/src/common/idempotency/idempotency.interceptor.ts @@ -0,0 +1,72 @@ +import { + BadRequestException, + CallHandler, + ConflictException, + ExecutionContext, + Injectable, + NestInterceptor, + UnprocessableEntityException, +} from '@nestjs/common'; +import { createHash } from 'crypto'; +import type { Request } from 'express'; +import { Observable, of, throwError } from 'rxjs'; +import { catchError, tap } from 'rxjs/operators'; +import { IdempotencyService } from './idempotency.service'; + +const IDEMPOTENCY_HEADER = 'idempotency-key'; + +@Injectable() +export class IdempotencyInterceptor implements NestInterceptor { + constructor(private readonly idempotencyService: IdempotencyService) {} + + async intercept( + context: ExecutionContext, + next: CallHandler, + ): Promise> { + const request = context.switchToHttp().getRequest(); + const key = request.headers[IDEMPOTENCY_HEADER]; + + if (!key || typeof key !== 'string') { + throw new BadRequestException( + `${IDEMPOTENCY_HEADER} header is required for this request`, + ); + } + + const requestHash = createHash('sha256') + .update(`${request.method}:${request.originalUrl}:${JSON.stringify(request.body ?? {})}`) + .digest('hex'); + + const result = await this.idempotencyService.acquire( + key, + request.user.id, + requestHash, + ); + + if (!result.acquired) { + const { record } = result; + if (record.request_hash !== requestHash) { + throw new UnprocessableEntityException( + 'Idempotency-Key was already used with a different request body', + ); + } + if (record.in_progress) { + throw new ConflictException( + 'A request with this Idempotency-Key is already in progress', + ); + } + return of(record.response_body); + } + + const { record } = result; + return next.handle().pipe( + tap((data) => { + const response = context.switchToHttp().getResponse<{ statusCode: number }>(); + void this.idempotencyService.complete(record.id, response.statusCode, data); + }), + catchError((err) => { + void this.idempotencyService.release(record.id); + return throwError(() => err); + }), + ); + } +} diff --git a/backend/src/common/idempotency/idempotency.service.ts b/backend/src/common/idempotency/idempotency.service.ts new file mode 100644 index 00000000..f15ac468 --- /dev/null +++ b/backend/src/common/idempotency/idempotency.service.ts @@ -0,0 +1,77 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { InjectRepository } from '@nestjs/typeorm'; +import { Cron, CronExpression } from '@nestjs/schedule'; +import { LessThan, Repository } from 'typeorm'; +import { IdempotencyKey } from './idempotency-key.entity'; + +const POSTGRES_UNIQUE_VIOLATION = '23505'; +const CLEANUP_AGE_MS = 24 * 60 * 60 * 1000; + +export type AcquireResult = + | { acquired: true; record: IdempotencyKey } + | { acquired: false; record: IdempotencyKey }; + +@Injectable() +export class IdempotencyService { + private readonly logger = new Logger(IdempotencyService.name); + + constructor( + @InjectRepository(IdempotencyKey) + private readonly repository: Repository, + ) {} + + /** + * Atomically claims a key for a user. Relies on the unique (key, userId) + * index to detect a concurrent or prior request with the same key. + */ + async acquire(key: string, userId: string, requestHash: string): Promise { + try { + const inserted = await this.repository.save( + this.repository.create({ + key, + userId, + request_hash: requestHash, + in_progress: true, + }), + ); + return { acquired: true, record: inserted }; + } catch (err) { + if ((err as { code?: string }).code !== POSTGRES_UNIQUE_VIOLATION) { + throw err; + } + const existing = await this.repository.findOneBy({ key, userId }); + if (!existing) { + throw err; + } + return { acquired: false, record: existing }; + } + } + + async complete( + id: string, + statusCode: number, + responseBody: unknown, + ): Promise { + await this.repository.update(id, { + status_code: statusCode, + response_body: responseBody as object, + in_progress: false, + }); + } + + /** Frees the key so the client can safely retry after a failed handler. */ + async release(id: string): Promise { + await this.repository.delete(id); + } + + @Cron(CronExpression.EVERY_HOUR) + async cleanupExpiredKeys(): Promise { + const cutoff = new Date(Date.now() - CLEANUP_AGE_MS); + const { affected } = await this.repository.delete({ + created_at: LessThan(cutoff), + }); + if (affected) { + this.logger.log(`Cleaned up ${affected} expired idempotency key(s)`); + } + } +} diff --git a/backend/src/common/idempotency/idempotent.decorator.ts b/backend/src/common/idempotency/idempotent.decorator.ts new file mode 100644 index 00000000..4f3d7b3a --- /dev/null +++ b/backend/src/common/idempotency/idempotent.decorator.ts @@ -0,0 +1,12 @@ +import { applyDecorators, SetMetadata, UseInterceptors } from '@nestjs/common'; +import { IdempotencyInterceptor } from './idempotency.interceptor'; + +export const IDEMPOTENT_KEY = 'idempotent'; + +/** Marks a write route as requiring an `Idempotency-Key` request header. */ +export function Idempotent() { + return applyDecorators( + SetMetadata(IDEMPOTENT_KEY, true), + UseInterceptors(IdempotencyInterceptor), + ); +} diff --git a/backend/src/migrations/1776500000000-CreateIdempotencyKeys.ts b/backend/src/migrations/1776500000000-CreateIdempotencyKeys.ts new file mode 100644 index 00000000..7fb497de --- /dev/null +++ b/backend/src/migrations/1776500000000-CreateIdempotencyKeys.ts @@ -0,0 +1,103 @@ +import { + MigrationInterface, + QueryRunner, + Table, + TableIndex, + TableForeignKey, +} from 'typeorm'; + +export class CreateIdempotencyKeys1776500000000 implements MigrationInterface { + public async up(queryRunner: QueryRunner): Promise { + await queryRunner.createTable( + new Table({ + name: 'idempotency_keys', + columns: [ + { + name: 'id', + type: 'uuid', + isPrimary: true, + generationStrategy: 'uuid', + default: 'uuid_generate_v4()', + }, + { + name: 'key', + type: 'varchar', + length: '255', + isNullable: false, + }, + { + name: 'userId', + type: 'uuid', + isNullable: false, + }, + { + // sha256 of method+path+body — detects same-key/different-body reuse + name: 'request_hash', + type: 'varchar', + length: '64', + isNullable: false, + }, + { + // Null while the original request is still executing + name: 'status_code', + type: 'int', + isNullable: true, + }, + { + name: 'response_body', + type: 'jsonb', + isNullable: true, + }, + { + name: 'in_progress', + type: 'boolean', + default: true, + isNullable: false, + }, + { + name: 'created_at', + type: 'timestamptz', + default: 'CURRENT_TIMESTAMP', + isNullable: false, + }, + ], + }), + true, + ); + + // Enforces one row per (key, user) — the source of the 409/422 checks + await queryRunner.createIndex( + 'idempotency_keys', + new TableIndex({ + name: 'IDX_idempotency_key_user', + columnNames: ['key', 'userId'], + isUnique: true, + }), + ); + + // Used by the cleanup cron to find expired rows + await queryRunner.createIndex( + 'idempotency_keys', + new TableIndex({ + name: 'IDX_idempotency_created_at', + columnNames: ['created_at'], + }), + ); + + await queryRunner.createForeignKey( + 'idempotency_keys', + new TableForeignKey({ + name: 'FK_idempotency_user', + columnNames: ['userId'], + referencedTableName: 'users', + referencedColumnNames: ['id'], + onDelete: 'CASCADE', + }), + ); + } + + public async down(queryRunner: QueryRunner): Promise { + await queryRunner.dropForeignKey('idempotency_keys', 'FK_idempotency_user'); + await queryRunner.dropTable('idempotency_keys'); + } +} diff --git a/backend/src/predictions/predictions.controller.ts b/backend/src/predictions/predictions.controller.ts index e3a40f2c..f2446765 100644 --- a/backend/src/predictions/predictions.controller.ts +++ b/backend/src/predictions/predictions.controller.ts @@ -33,6 +33,7 @@ import { } from './dto/list-market-predictions.dto'; import { CurrentUser } from '../common/decorators/current-user.decorator'; import { Public } from '../common/decorators/public.decorator'; +import { Idempotent } from '../common/idempotency/idempotent.decorator'; import { User } from '../users/entities/user.entity'; import { Prediction } from './entities/prediction.entity'; @@ -44,6 +45,7 @@ export class PredictionsController { @Post() @UseGuards(BanGuard) + @Idempotent() @HttpCode(HttpStatus.CREATED) @ApiOperation({ summary: 'Submit a prediction on a market' }) @ApiResponse({ @@ -51,11 +53,15 @@ export class PredictionsController { description: 'Prediction submitted', type: Prediction, }) - @ApiResponse({ status: 400, description: 'Market closed or invalid outcome' }) + @ApiResponse({ status: 400, description: 'Market closed, invalid outcome, or missing Idempotency-Key' }) @ApiResponse({ status: 404, description: 'Market not found' }) @ApiResponse({ status: 409, - description: 'Duplicate prediction on this market', + description: 'Duplicate prediction on this market, or a request with the same Idempotency-Key is already in progress', + }) + @ApiResponse({ + status: 422, + description: 'Idempotency-Key reused with a different request body', }) async submit( @Body() dto: SubmitPredictionDto, diff --git a/backend/src/predictions/predictions.module.ts b/backend/src/predictions/predictions.module.ts index ebe02ef6..cca999f0 100644 --- a/backend/src/predictions/predictions.module.ts +++ b/backend/src/predictions/predictions.module.ts @@ -6,6 +6,7 @@ import { PredictionsController } from './predictions.controller'; import { UsersModule } from '../users/users.module'; import { MarketsModule } from '../markets/markets.module'; import { SorobanModule } from '../soroban/soroban.module'; +import { CommonModule } from '../common/common.module'; import { User } from '../users/entities/user.entity'; import { Market } from '../markets/entities/market.entity'; @@ -15,6 +16,7 @@ import { Market } from '../markets/entities/market.entity'; UsersModule, MarketsModule, SorobanModule, + CommonModule, ], controllers: [PredictionsController], providers: [PredictionsService],