Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions backend/src/common/common.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@ import { Module } from '@nestjs/common';
import { ConfigModule, ConfigService } from '@nestjs/config';
import { JwtModule } from '@nestjs/jwt';
import { PassportModule } from '@nestjs/passport';
import { TypeOrmModule } from '@nestjs/typeorm';
import { FilteringService } from './filtering.service';
import { IdempotencyKey } from './idempotency/idempotency-key.entity';
import { IdempotencyService } from './idempotency/idempotency.service';
import { IdempotencyInterceptor } from './idempotency/idempotency.interceptor';

@Module({
imports: [
Expand All @@ -17,8 +21,9 @@ import { FilteringService } from './filtering.service';
},
}),
}),
TypeOrmModule.forFeature([IdempotencyKey]),
],
providers: [FilteringService],
exports: [JwtModule, FilteringService],
providers: [FilteringService, IdempotencyService, IdempotencyInterceptor],
exports: [JwtModule, FilteringService, IdempotencyService, IdempotencyInterceptor],
})
export class CommonModule {}
37 changes: 37 additions & 0 deletions backend/src/common/idempotency/idempotency-key.entity.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import {
Entity,
PrimaryGeneratedColumn,
Column,
CreateDateColumn,
Index,
} from 'typeorm';

@Entity('idempotency_keys')
@Index(['key', 'userId'], { unique: true })
export class IdempotencyKey {
@PrimaryGeneratedColumn('uuid')
id: string;

@Column({ type: 'varchar', length: 255 })
key: string;

@Column({ type: 'uuid' })
userId: string;

/** sha256 of method+path+body — detects same-key/different-body reuse */
@Column({ name: 'request_hash', type: 'varchar', length: 64 })
request_hash: string;

/** Null while the original request is still executing */
@Column({ name: 'status_code', type: 'int', nullable: true })
status_code: number | null;

@Column({ name: 'response_body', type: 'jsonb', nullable: true })
response_body: unknown;

@Column({ name: 'in_progress', type: 'boolean', default: true })
in_progress: boolean;

@CreateDateColumn({ type: 'timestamptz' })
created_at: Date;
}
72 changes: 72 additions & 0 deletions backend/src/common/idempotency/idempotency.interceptor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import {
BadRequestException,
CallHandler,
ConflictException,
ExecutionContext,
Injectable,
NestInterceptor,
UnprocessableEntityException,
} from '@nestjs/common';
import { createHash } from 'crypto';
import type { Request } from 'express';
import { Observable, of, throwError } from 'rxjs';
import { catchError, tap } from 'rxjs/operators';
import { IdempotencyService } from './idempotency.service';

const IDEMPOTENCY_HEADER = 'idempotency-key';

@Injectable()
export class IdempotencyInterceptor implements NestInterceptor {
constructor(private readonly idempotencyService: IdempotencyService) {}

async intercept(
context: ExecutionContext,
next: CallHandler,
): Promise<Observable<unknown>> {
const request = context.switchToHttp().getRequest<Request & { user: { id: string } }>();
const key = request.headers[IDEMPOTENCY_HEADER];

if (!key || typeof key !== 'string') {
throw new BadRequestException(
`${IDEMPOTENCY_HEADER} header is required for this request`,
);
}

const requestHash = createHash('sha256')
.update(`${request.method}:${request.originalUrl}:${JSON.stringify(request.body ?? {})}`)
.digest('hex');

const result = await this.idempotencyService.acquire(
key,
request.user.id,
requestHash,
);

if (!result.acquired) {
const { record } = result;
if (record.request_hash !== requestHash) {
throw new UnprocessableEntityException(
'Idempotency-Key was already used with a different request body',
);
}
if (record.in_progress) {
throw new ConflictException(
'A request with this Idempotency-Key is already in progress',
);
}
return of(record.response_body);
}

const { record } = result;
return next.handle().pipe(
tap((data) => {
const response = context.switchToHttp().getResponse<{ statusCode: number }>();
void this.idempotencyService.complete(record.id, response.statusCode, data);
}),
catchError((err) => {
void this.idempotencyService.release(record.id);
return throwError(() => err);
}),
);
}
}
77 changes: 77 additions & 0 deletions backend/src/common/idempotency/idempotency.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import { Injectable, Logger } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { Cron, CronExpression } from '@nestjs/schedule';
import { LessThan, Repository } from 'typeorm';
import { IdempotencyKey } from './idempotency-key.entity';

const POSTGRES_UNIQUE_VIOLATION = '23505';
const CLEANUP_AGE_MS = 24 * 60 * 60 * 1000;

export type AcquireResult =
| { acquired: true; record: IdempotencyKey }
| { acquired: false; record: IdempotencyKey };

@Injectable()
export class IdempotencyService {
private readonly logger = new Logger(IdempotencyService.name);

constructor(
@InjectRepository(IdempotencyKey)
private readonly repository: Repository<IdempotencyKey>,
) {}

/**
* Atomically claims a key for a user. Relies on the unique (key, userId)
* index to detect a concurrent or prior request with the same key.
*/
async acquire(key: string, userId: string, requestHash: string): Promise<AcquireResult> {
try {
const inserted = await this.repository.save(
this.repository.create({
key,
userId,
request_hash: requestHash,
in_progress: true,
}),
);
return { acquired: true, record: inserted };
} catch (err) {
if ((err as { code?: string }).code !== POSTGRES_UNIQUE_VIOLATION) {
throw err;
}
const existing = await this.repository.findOneBy({ key, userId });
if (!existing) {
throw err;
}
return { acquired: false, record: existing };
}
}

async complete(
id: string,
statusCode: number,
responseBody: unknown,
): Promise<void> {
await this.repository.update(id, {
status_code: statusCode,
response_body: responseBody as object,
in_progress: false,
});
}

/** Frees the key so the client can safely retry after a failed handler. */
async release(id: string): Promise<void> {
await this.repository.delete(id);
}

@Cron(CronExpression.EVERY_HOUR)
async cleanupExpiredKeys(): Promise<void> {
const cutoff = new Date(Date.now() - CLEANUP_AGE_MS);
const { affected } = await this.repository.delete({
created_at: LessThan(cutoff),
});
if (affected) {
this.logger.log(`Cleaned up ${affected} expired idempotency key(s)`);
}
}
}
12 changes: 12 additions & 0 deletions backend/src/common/idempotency/idempotent.decorator.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import { applyDecorators, SetMetadata, UseInterceptors } from '@nestjs/common';
import { IdempotencyInterceptor } from './idempotency.interceptor';

export const IDEMPOTENT_KEY = 'idempotent';

/** Marks a write route as requiring an `Idempotency-Key` request header. */
export function Idempotent() {
return applyDecorators(
SetMetadata(IDEMPOTENT_KEY, true),
UseInterceptors(IdempotencyInterceptor),
);
}
103 changes: 103 additions & 0 deletions backend/src/migrations/1776500000000-CreateIdempotencyKeys.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import {
MigrationInterface,
QueryRunner,
Table,
TableIndex,
TableForeignKey,
} from 'typeorm';

export class CreateIdempotencyKeys1776500000000 implements MigrationInterface {
public async up(queryRunner: QueryRunner): Promise<void> {
await queryRunner.createTable(
new Table({
name: 'idempotency_keys',
columns: [
{
name: 'id',
type: 'uuid',
isPrimary: true,
generationStrategy: 'uuid',
default: 'uuid_generate_v4()',
},
{
name: 'key',
type: 'varchar',
length: '255',
isNullable: false,
},
{
name: 'userId',
type: 'uuid',
isNullable: false,
},
{
// sha256 of method+path+body — detects same-key/different-body reuse
name: 'request_hash',
type: 'varchar',
length: '64',
isNullable: false,
},
{
// Null while the original request is still executing
name: 'status_code',
type: 'int',
isNullable: true,
},
{
name: 'response_body',
type: 'jsonb',
isNullable: true,
},
{
name: 'in_progress',
type: 'boolean',
default: true,
isNullable: false,
},
{
name: 'created_at',
type: 'timestamptz',
default: 'CURRENT_TIMESTAMP',
isNullable: false,
},
],
}),
true,
);

// Enforces one row per (key, user) — the source of the 409/422 checks
await queryRunner.createIndex(
'idempotency_keys',
new TableIndex({
name: 'IDX_idempotency_key_user',
columnNames: ['key', 'userId'],
isUnique: true,
}),
);

// Used by the cleanup cron to find expired rows
await queryRunner.createIndex(
'idempotency_keys',
new TableIndex({
name: 'IDX_idempotency_created_at',
columnNames: ['created_at'],
}),
);

await queryRunner.createForeignKey(
'idempotency_keys',
new TableForeignKey({
name: 'FK_idempotency_user',
columnNames: ['userId'],
referencedTableName: 'users',
referencedColumnNames: ['id'],
onDelete: 'CASCADE',
}),
);
}

public async down(queryRunner: QueryRunner): Promise<void> {
await queryRunner.dropForeignKey('idempotency_keys', 'FK_idempotency_user');
await queryRunner.dropTable('idempotency_keys');
}
}
10 changes: 8 additions & 2 deletions backend/src/predictions/predictions.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import {
} from './dto/list-market-predictions.dto';
import { CurrentUser } from '../common/decorators/current-user.decorator';
import { Public } from '../common/decorators/public.decorator';
import { Idempotent } from '../common/idempotency/idempotent.decorator';
import { User } from '../users/entities/user.entity';
import { Prediction } from './entities/prediction.entity';

Expand All @@ -44,18 +45,23 @@ export class PredictionsController {

@Post()
@UseGuards(BanGuard)
@Idempotent()
@HttpCode(HttpStatus.CREATED)
@ApiOperation({ summary: 'Submit a prediction on a market' })
@ApiResponse({
status: 201,
description: 'Prediction submitted',
type: Prediction,
})
@ApiResponse({ status: 400, description: 'Market closed or invalid outcome' })
@ApiResponse({ status: 400, description: 'Market closed, invalid outcome, or missing Idempotency-Key' })
@ApiResponse({ status: 404, description: 'Market not found' })
@ApiResponse({
status: 409,
description: 'Duplicate prediction on this market',
description: 'Duplicate prediction on this market, or a request with the same Idempotency-Key is already in progress',
})
@ApiResponse({
status: 422,
description: 'Idempotency-Key reused with a different request body',
})
async submit(
@Body() dto: SubmitPredictionDto,
Expand Down
2 changes: 2 additions & 0 deletions backend/src/predictions/predictions.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { PredictionsController } from './predictions.controller';
import { UsersModule } from '../users/users.module';
import { MarketsModule } from '../markets/markets.module';
import { SorobanModule } from '../soroban/soroban.module';
import { CommonModule } from '../common/common.module';
import { User } from '../users/entities/user.entity';
import { Market } from '../markets/entities/market.entity';

Expand All @@ -15,6 +16,7 @@ import { Market } from '../markets/entities/market.entity';
UsersModule,
MarketsModule,
SorobanModule,
CommonModule,
],
controllers: [PredictionsController],
providers: [PredictionsService],
Expand Down
Loading