Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/**
* Migration: Optimize Payment Processor SQL Queries
* Issue #924: Optimize SQL queries in Payment Processor
*
* Adds targeted indexes to improve query performance for the most
* frequently executed payment service queries, particularly the
* payment listing and rolling metrics endpoints.
*/

export async function up(knex) {
// Partial index for payment listing with search (ILIKE on id, description, recipient)
// The existing merchant_id index doesn't cover text search well
await knex.raw(`
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_payments_description_search
ON payments USING gin (description gin_trgm_ops)
WHERE deleted_at IS NULL AND description IS NOT NULL
`).catch(() => {
// gin_trgm_ops extension may not be available; skip silently
console.log(" ℹ️ Skipping gin_trgm_ops index (extension not available)");
});

// Covering index for payment status endpoint (avoids heap lookup for common fields)
await knex.raw(`
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_payments_status_covering
ON payments (id)
INCLUDE (merchant_id, amount, asset, asset_issuer, recipient, status, tx_id, created_at)
WHERE deleted_at IS NULL
`);

// Index for refund lookups: confirmed payments by merchant with tx_id
await knex.raw(`
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_payments_merchant_refund
ON payments (merchant_id, status, id)
INCLUDE (amount, asset, asset_issuer, recipient, tx_id, metadata)
WHERE deleted_at IS NULL AND status = 'confirmed'
`);

// Index for path payment quote lookups
await knex.raw(`
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_payments_quote_lookup
ON payments (id, status, asset, asset_issuer, recipient, amount)
WHERE deleted_at IS NULL
`);

// Index for the rolling metrics time-range query with amount aggregation
await knex.raw(`
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_payments_metrics_window
ON payments (merchant_id, created_at DESC)
INCLUDE (amount, status)
WHERE deleted_at IS NULL
`);

// Partial index for x402 payments
await knex.raw(`
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_payments_x402_status
ON payments (status, created_at DESC)
WHERE deleted_at IS NULL AND metadata->>'x402_version' IS NOT NULL
`).catch(() => {
console.log(" ℹ️ Skipping x402 partial index (column or extension not available)");
});

console.log("✓ Added Payment Processor query optimization indexes");
}

export async function down(knex) {
await knex.raw("DROP INDEX CONCURRENTLY IF EXISTS idx_payments_description_search");
await knex.raw("DROP INDEX CONCURRENTLY IF EXISTS idx_payments_status_covering");
await knex.raw("DROP INDEX CONCURRENTLY IF EXISTS idx_payments_merchant_refund");
await knex.raw("DROP INDEX CONCURRENTLY IF EXISTS idx_payments_quote_lookup");
await knex.raw("DROP INDEX CONCURRENTLY IF EXISTS idx_payments_metrics_window");
await knex.raw("DROP INDEX CONCURRENTLY IF EXISTS idx_payments_x402_status");
console.log("✓ Removed Payment Processor query optimization indexes");
}
263 changes: 239 additions & 24 deletions backend/services/path-payment/errorRecovery.ts
Original file line number Diff line number Diff line change
@@ -1,63 +1,278 @@
import { logger } from '../../src/lib/logging';
import { logger } from '../../src/lib/logger.js';

export enum CircuitState {
CLOSED = 'CLOSED',
OPEN = 'OPEN',
HALF_OPEN = 'HALF_OPEN'
}

export enum ErrorCategory {
TRANSIENT = 'transient',
PERMANENT = 'permanent',
RATE_LIMITED = 'rate_limited',
AUTH = 'auth'
}

interface ErrorRecoveryOptions {
maxRetries?: number;
failureThreshold?: number;
resetTimeoutMs?: number;
baseDelayMs?: number;
maxDelayMs?: number;
label?: string;
}

interface RecoveryMetrics {
totalAttempts: number;
successCount: number;
failureCount: number;
circuitBreakerTrips: number;
lastFailureTime: number | null;
lastSuccessTime: number | null;
}

const RETRYABLE_ERROR_CODES = new Set([
'08000', '08003', '08006', '08P01',
'40001', '40P01',
'53300', '57P01', '57P02', '57P03',
]);

const NON_RETRYABLE_REASONS = new Set([
'invalid_signature', 'malformed_request', 'authorization_failure',
'invalid_input', 'not_found', 'duplicate',
]);

function classifyError(error: any): ErrorCategory {
if (!error) return ErrorCategory.TRANSIENT;

const reason = error.reason || error.message || '';
if (NON_RETRYABLE_REASONS.has(reason)) return ErrorCategory.PERMANENT;

const code = String(error.code || '');
if (code === '429' || code === '57P01') return ErrorCategory.RATE_LIMITED;
if (code.startsWith('08') || code === '40001') return ErrorCategory.TRANSIENT;

const status = error.status || error.response?.status;
if (status === 401 || status === 403) return ErrorCategory.AUTH;
if (status === 429) return ErrorCategory.RATE_LIMITED;
if (status >= 500) return ErrorCategory.TRANSIENT;

return ErrorCategory.TRANSIENT;
}

function isRetryable(error: any): boolean {
const category = classifyError(error);
return category === ErrorCategory.TRANSIENT || category === ErrorCategory.RATE_LIMITED;
}

function getBackoffDelay(attempt: number, baseDelayMs: number, maxDelayMs: number): number {
const exponential = baseDelayMs * Math.pow(2, attempt);
const jitter = Math.random() * baseDelayMs;
return Math.min(exponential + jitter, maxDelayMs);
}

export class ErrorRecovery {
private maxRetries = 3;
private maxRetries: number;
private circuitState: CircuitState = CircuitState.CLOSED;
private failureCount = 0;
private failureThreshold = 5;
private failureThreshold: number;
private resetTimeoutMs: number;
private baseDelayMs: number;
private maxDelayMs: number;
private label: string;
private resetTimer: ReturnType<typeof setTimeout> | null = null;
private halfOpenSuccessCount = 0;
private halfOpenRequired = 2;

private metrics: RecoveryMetrics = {
totalAttempts: 0,
successCount: 0,
failureCount: 0,
circuitBreakerTrips: 0,
lastFailureTime: null,
lastSuccessTime: null,
};

constructor(options: ErrorRecoveryOptions = {}) {
this.maxRetries = options.maxRetries ?? 3;
this.failureThreshold = options.failureThreshold ?? 5;
this.resetTimeoutMs = options.resetTimeoutMs ?? 60_000;
this.baseDelayMs = options.baseDelayMs ?? 1_000;
this.maxDelayMs = options.maxDelayMs ?? 30_000;
this.label = options.label ?? 'payment-processor';
}

getState(): CircuitState {
return this.circuitState;
}

getMetrics(): RecoveryMetrics {
return { ...this.metrics };
}

private isRetryable(error: any): boolean {
const nonRetryableReasons = ['invalid_signature', 'malformed_request', 'authorization_failure'];
if (error && error.reason && nonRetryableReasons.includes(error.reason)) {
return false;
reset(): void {
this.circuitState = CircuitState.CLOSED;
this.failureCount = 0;
this.halfOpenSuccessCount = 0;
if (this.resetTimer) {
clearTimeout(this.resetTimer);
this.resetTimer = null;
}
// Assume other DB failures / Horizon timeouts are retryable
return true;
logger.info({ label: this.label }, 'Error recovery: circuit breaker reset');
}

public async executeWithRetry<T>(operation: () => Promise<T>): Promise<T> {
async executeWithRetry<T>(operation: () => Promise<T>): Promise<T> {
if (this.circuitState === CircuitState.OPEN) {
throw new Error('Circuit Breaker is OPEN');
const error = new Error(`Circuit breaker is OPEN for ${this.label}`);
(error as any).category = ErrorCategory.TRANSIENT;
(error as any).circuitBreakerOpen = true;
throw error;
}

let attempt = 0;
while (attempt <= this.maxRetries) {
this.metrics.totalAttempts++;
try {
const result = await operation();
this.onSuccess();
return result;
} catch (error) {
attempt++;
if (!this.isRetryable(error) || attempt > this.maxRetries) {
const category = classifyError(error);
const retryable = isRetryable(error);

if (!retryable || attempt > this.maxRetries) {
this.onFailure();
logger.error(
{
label: this.label,
attempt,
category,
error: (error as any)?.message || String(error),
circuitState: this.circuitState,
},
'Error recovery: operation failed permanently',
);
throw error;
}
logger.warn({ event: "path_payment_retry", attempt });
await new Promise(resolve => setTimeout(resolve, 1000 * attempt));

const delayMs = getBackoffDelay(attempt - 1, this.baseDelayMs, this.maxDelayMs);
logger.warn(
{
label: this.label,
attempt,
maxRetries: this.maxRetries,
delayMs,
category,
error: (error as any)?.message || String(error),
},
'Error recovery: retryable error — retrying with backoff',
);
await new Promise(resolve => setTimeout(resolve, delayMs));
}
}
throw new Error('Max retries exceeded');

const error = new Error(`Max retries (${this.maxRetries}) exceeded for ${this.label}`);
this.onFailure();
throw error;
}

private onSuccess() {
this.failureCount = 0;
this.circuitState = CircuitState.CLOSED;
private onSuccess(): void {
this.metrics.successCount++;
this.metrics.lastSuccessTime = Date.now();

if (this.circuitState === CircuitState.HALF_OPEN) {
this.halfOpenSuccessCount++;
if (this.halfOpenSuccessCount >= this.halfOpenRequired) {
this.circuitState = CircuitState.CLOSED;
this.failureCount = 0;
this.halfOpenSuccessCount = 0;
logger.info(
{ label: this.label, successCount: this.halfOpenSuccessCount },
'Error recovery: circuit breaker CLOSED — service recovered',
);
}
} else {
this.failureCount = 0;
}
}

private onFailure() {
private onFailure(): void {
this.failureCount++;
this.metrics.failureCount++;
this.metrics.lastFailureTime = Date.now();

if (this.circuitState === CircuitState.HALF_OPEN) {
this.tripCircuitBreaker();
return;
}

if (this.failureCount >= this.failureThreshold) {
this.circuitState = CircuitState.OPEN;
// In a real app, transition to half-open after a timeout
setTimeout(() => {
this.circuitState = CircuitState.HALF_OPEN;
}, 10000);
this.tripCircuitBreaker();
}
}

private tripCircuitBreaker(): void {
this.circuitState = CircuitState.OPEN;
this.metrics.circuitBreakerTrips++;
this.halfOpenSuccessCount = 0;

logger.error(
{
label: this.label,
failureCount: this.failureCount,
resetTimeoutMs: this.resetTimeoutMs,
},
'Error recovery: circuit breaker OPEN — pausing operations',
);

if (this.resetTimer) {
clearTimeout(this.resetTimer);
}

this.resetTimer = setTimeout(() => {
this.circuitState = CircuitState.HALF_OPEN;
this.halfOpenSuccessCount = 0;
logger.info(
{ label: this.label },
'Error recovery: circuit breaker HALF_OPEN — allowing trial requests',
);
}, this.resetTimeoutMs);
}
}

export function createPaymentProcessorRecovery(options: ErrorRecoveryOptions = {}): ErrorRecovery {
return new ErrorRecovery({
label: 'payment-processor',
maxRetries: 3,
failureThreshold: 5,
resetTimeoutMs: 60_000,
baseDelayMs: 1_000,
maxDelayMs: 30_000,
...options,
});
}

export function createHorizonRecovery(options: ErrorRecoveryOptions = {}): ErrorRecovery {
return new ErrorRecovery({
label: 'horizon-api',
maxRetries: 3,
failureThreshold: 10,
resetTimeoutMs: 120_000,
baseDelayMs: 500,
maxDelayMs: 15_000,
...options,
});
}

export function createDatabaseRecovery(options: ErrorRecoveryOptions = {}): ErrorRecovery {
return new ErrorRecovery({
label: 'database',
maxRetries: 2,
failureThreshold: 5,
resetTimeoutMs: 60_000,
baseDelayMs: 150,
maxDelayMs: 5_000,
...options,
});
}
Loading
Loading