diff --git a/docs/database-transactions.md b/docs/database-transactions.md new file mode 100644 index 0000000..d5b1b26 --- /dev/null +++ b/docs/database-transactions.md @@ -0,0 +1,75 @@ +# Database Transactions + +## Quick Reference + +| Helper | Retries? | Use For | +| ------------------------ | ---------------------------- | ------------------------------------------------------------------------- | +| `withTransaction` | ✅ Yes (exponential backoff) | **All money-moving code** — loans, repayments, transfers, balance updates | +| `withTransactionNoRetry` | ❌ No | Read-only queries, idempotent admin scripts, externally-managed retry | + +## Import + +```typescript +import { withTransaction, withTransactionNoRetry } from "../db/transaction"; + +Why Two Helpers? +PostgreSQL (and other MVCC databases) can raise transient errors under concurrency: +40001 serialization_failure — concurrent transactions conflict on row versions +40P01 deadlock_detected — circular lock dependency between transactions +These are expected under load and safe to retry — the transaction had not yet committed. +withTransaction automatically retries with exponential backoff (50ms → 100ms → 200ms … max 2s, with jitter). +withTransactionNoRetry skips this overhead for paths where it adds no value. + +Examples +Money-moving: use retrying variant +import { withTransaction } from "../db/transaction"; + +async function disburseLoan(loanId: string, amount: BigNumber) { + const client = await pool.connect(); + try { + return await withTransaction(client, async (tx) => { + // Deduct from lender escrow + await tx.query( + "UPDATE escrow_balances SET balance = balance - $1 WHERE id = $2", + [amount, lenderId] + ); + + // Credit borrower wallet + await tx.query( + "UPDATE wallet_balances SET balance = balance + $1 WHERE id = $2", + [amount, borrowerId] + ); + + // Mark loan disbursed + await tx.query( + "UPDATE loans SET status = 'disbursed', disbursed_at = NOW() WHERE id = $1", + [loanId] + ); + + return { disbursed: true }; + }); + } finally { + client.release(); + } +} + + +Read-only: use no-retry variant (optional) +import { withTransactionNoRetry } from "../db/transaction"; + +async function getLoanHistory(userId: string) { + const client = await pool.connect(); + try { + return await withTransactionNoRetry(client, async (tx) => { + // SET TRANSACTION READ ONLY; -- optional optimization + const { rows } = await tx.query( + "SELECT * FROM loans WHERE borrower_id = $1 ORDER BY created_at DESC", + [userId] + ); + return rows; + }); + } finally { + client.release(); + } +} +``` diff --git a/scripts/audit-transaction-imports.ts b/scripts/audit-transaction-imports.ts new file mode 100644 index 0000000..5da9c58 --- /dev/null +++ b/scripts/audit-transaction-imports.ts @@ -0,0 +1,161 @@ +#!/usr/bin/env ts-node +/** + * Audit script: find all withTransaction imports across the codebase. + * + * Usage: + * npx ts-node scripts/audit-transaction-imports.ts + * + * Outputs a report showing: + * - Files importing from "../db/connection" (should migrate) + * - Files importing from "../db/transaction" (correct) + * - Files using withTransactionNoRetry (verify intentional) + */ + +import * as fs from "fs"; +import * as path from "path"; + +const SRC_DIR = path.join(__dirname, "..", "src"); + +interface ImportMatch { + file: string; + line: number; + text: string; + source: "connection" | "transaction" | "unknown"; + usesRetryVariant: boolean; + usesNoRetry: boolean; +} + +function findTsFiles(dir: string): string[] { + const files: string[] = []; + for (const entry of fs.readdirSync(dir, { withFileTypes: true })) { + const fullPath = path.join(dir, entry.name); + if (entry.isDirectory()) { + files.push(...findTsFiles(fullPath)); + } else if (entry.name.endsWith(".ts") && !entry.name.endsWith(".d.ts")) { + files.push(fullPath); + } + } + return files; +} + +function analyzeFile(filePath: string): ImportMatch[] { + const content = fs.readFileSync(filePath, "utf-8"); + const lines = content.split("\n"); + const matches: ImportMatch[] = []; + + for (let i = 0; i < lines.length; i++) { + const line = lines[i]; + const importRegex = + /import\s+.*?\{[^}]*\b(withTransaction|withTransactionNoRetry)\b[^}]*\}.*?from\s+['"]([^'"]+)['"]/; + const match = line.match(importRegex); + + if (match) { + const sourceModule = match[2]; + const source: ImportMatch["source"] = sourceModule.includes("connection") + ? "connection" + : sourceModule.includes("transaction") + ? "transaction" + : "unknown"; + + matches.push({ + file: path.relative(process.cwd(), filePath), + line: i + 1, + text: line.trim(), + source, + usesRetryVariant: line.includes("withTransaction"), + usesNoRetry: line.includes("withTransactionNoRetry"), + }); + } + } + + return matches; +} + +function main() { + const files = findTsFiles(SRC_DIR); + const allMatches: ImportMatch[] = []; + + for (const file of files) { + allMatches.push(...analyzeFile(file)); + } + + // Categorize + const fromConnection = allMatches.filter((m) => m.source === "connection"); + const fromTransaction = allMatches.filter((m) => m.source === "transaction"); + const usingNoRetry = allMatches.filter((m) => m.usesNoRetry); + + console.log("═══════════════════════════════════════════════════════════"); + console.log(" withTransaction Import Audit Report"); + console.log("═══════════════════════════════════════════════════════════\n"); + + console.log(`Total imports found: ${allMatches.length}\n`); + + if (fromConnection.length > 0) { + console.log( + `⚠️ Imports from connection.ts (NEED MIGRATION): ${fromConnection.length}`, + ); + console.log( + " These should be updated to import from '../db/transaction'\n", + ); + for (const m of fromConnection) { + console.log(` ${m.file}:${m.line}`); + console.log(` → ${m.text}\n`); + } + } else { + console.log("✅ No imports from connection.ts — all migrated!\n"); + } + + if (fromTransaction.length > 0) { + console.log( + `✅ Imports from transaction.ts (CORRECT): ${fromTransaction.length}\n`, + ); + for (const m of fromTransaction) { + console.log(` ${m.file}:${m.line}`); + console.log(` → ${m.text}\n`); + } + } + + if (usingNoRetry.length > 0) { + console.log( + `ℹ️ Files using withTransactionNoRetry: ${usingNoRetry.length}`, + ); + console.log(" Please verify these are intentionally non-retrying:\n"); + for (const m of usingNoRetry) { + console.log(` ${m.file}:${m.line}`); + console.log(` → ${m.text}\n`); + } + } + + // Money-moving paths check + const moneyPaths = allMatches.filter( + (m) => + m.file.toLowerCase().includes("loan") || + m.file.toLowerCase().includes("payment") || + m.file.toLowerCase().includes("repay") || + m.file.toLowerCase().includes("transfer") || + m.file.toLowerCase().includes("wallet") || + m.file.toLowerCase().includes("balance"), + ); + + if (moneyPaths.length > 0) { + console.log("💰 Money-moving paths using withTransaction:"); + for (const m of moneyPaths) { + const status = m.usesNoRetry + ? "❌ USES NO-RETRY — RISK!" + : "✅ retrying variant"; + console.log(` ${m.file}:${m.line} — ${status}`); + } + } + + console.log("\n═══════════════════════════════════════════════════════════"); + console.log(" Recommended fixes:"); + console.log("═══════════════════════════════════════════════════════════"); + console.log( + ` sed -i 's|from "../db/connection"|from "../db/transaction"|g' src/**/*.ts`, + ); + console.log( + " Then verify money-moving paths use withTransaction (not NoRetry).", + ); +} + +main(); diff --git a/src/app.ts b/src/app.ts index 5b33a56..6b0d1e6 100644 --- a/src/app.ts +++ b/src/app.ts @@ -10,7 +10,7 @@ import dotenv from "dotenv"; import { Sentry } from "./config/sentry.js"; dotenv.config(); -import pool from "./db/connection.js"; +import { pool } from "./db/connection.js"; import { cacheService } from "./services/cacheService.js"; import { sorobanService } from "./services/sorobanService.js"; import simulationRoutes from "./routes/simulationRoutes.js"; diff --git a/src/controllers/loanController.ts b/src/controllers/loanController.ts index 1867f75..a047539 100644 --- a/src/controllers/loanController.ts +++ b/src/controllers/loanController.ts @@ -1,9 +1,7 @@ import type { Request, Response, NextFunction } from "express"; import { query } from "../db/connection.js"; -import { - withTransaction, - withStellarAndDbTransaction, -} from "../db/transaction.js"; +import { withTransaction } from "../db/connection.js"; +import { withStellarAndDbTransaction } from "../db/transaction.js"; import { AppError } from "../errors/AppError.js"; import { asyncHandler } from "../utils/asyncHandler.js"; import { getLoanConfig } from "../config/loanConfig.js"; diff --git a/src/controllers/poolController.ts b/src/controllers/poolController.ts index 2a308f2..a9a7344 100644 --- a/src/controllers/poolController.ts +++ b/src/controllers/poolController.ts @@ -1,5 +1,6 @@ import { Request, Response } from "express"; import { query } from "../db/connection.js"; +import { withTransaction } from "../db/connection.js"; import { withStellarAndDbTransaction } from "../db/transaction.js"; import { AppError } from "../errors/AppError.js"; import { ErrorCode } from "../errors/errorCodes.js"; diff --git a/src/db/connection.ts b/src/db/connection.ts index 3fbdd21..60c61c2 100644 --- a/src/db/connection.ts +++ b/src/db/connection.ts @@ -2,6 +2,7 @@ import pg, { type PoolClient } from "pg"; import logger from "../utils/logger.js"; export type { PoolClient }; +export { withTransaction } from "./transaction.js"; const { Pool } = pg; @@ -16,7 +17,7 @@ const idleTimeoutMillis = process.env.DB_IDLE_TIMEOUT_MS ? parseInt(process.env.DB_IDLE_TIMEOUT_MS, 10) : 30000; -const pool = new Pool({ +export const pool = new Pool({ connectionString: process.env.DATABASE_URL, min: minPoolSize, max: maxPoolSize, @@ -76,59 +77,6 @@ const withRetry = async ( } }; -/** - * Execute `fn` inside a single dedicated database transaction. - * - * A single PoolClient is checked out for the lifetime of the call so that - * BEGIN / all DML / COMMIT all run on the **same** PostgreSQL connection. - * If `fn` throws, or if any transient error is encountered, the transaction - * is rolled back and the error is re-thrown after up to `maxRetries` attempts - * with exponential back-off. - * - * @param fn Callback that receives the pinned client. - * @param maxRetries Number of retry attempts on transient errors (default 3). - * @param baseDelayMs Initial back-off delay in milliseconds (doubles each retry). - */ -export async function withTransaction( - fn: (client: PoolClient) => Promise, - maxRetries = 3, - baseDelayMs = 200, -): Promise { - let attempt = 0; - - while (true) { - const client = await getClient(); - try { - await client.query("BEGIN"); - const result = await fn(client); - await client.query("COMMIT"); - return result; - } catch (error: any) { - try { - await client.query("ROLLBACK"); - } catch (rollbackError) { - logger.error("Failed to rollback transaction", { rollbackError }); - } - - const isTransient = TRANSIENT_ERROR_CODES.has(error?.code); - if (isTransient && attempt < maxRetries) { - const delay = baseDelayMs * 2 ** attempt; - attempt++; - logger.warn( - `Transient DB error in transaction (${error.code}). ` + - `Retrying in ${delay}ms (attempt ${attempt}/${maxRetries})`, - ); - await new Promise((resolve) => setTimeout(resolve, delay)); - continue; - } - - throw error; - } finally { - client.release(); - } - } -} - const checkExhaustion = () => { if (pool.totalCount >= maxPoolSize && pool.idleCount === 0) { logger.warn( diff --git a/src/db/transaction.ts b/src/db/transaction.ts index e4189d1..23c725e 100644 --- a/src/db/transaction.ts +++ b/src/db/transaction.ts @@ -1,44 +1,74 @@ import { getClient } from "./connection.js"; import logger from "../utils/logger.js"; +// Re-use transient error codes from connection.ts +import { TRANSIENT_ERROR_CODES } from "./connection.js"; + /** * Execute a database transaction with automatic rollback on error - * @param operations - Array of database operations to execute within the transaction + * and retry on transient failures (08003, 08006, etc.). + * + * @param operations - Function receiving a pinned PoolClient + * @param maxRetries - Number of retry attempts on transient errors (default 3) + * @param baseDelayMs - Initial back-off delay in milliseconds * @returns Promise with the result of the operations */ export async function withTransaction( operations: (client: any) => Promise, + maxRetries = 3, + baseDelayMs = 200, ): Promise { - let client; - try { - client = await getClient(); - } catch (error) { - logger.error("Failed to acquire database client for transaction", { - error, - }); - throw new Error("Database connection failed"); - } + let attempt = 0; - if (!client) { - throw new Error("Database client is undefined"); - } + while (true) { + let client; + try { + client = await getClient(); + } catch (error) { + logger.error("Failed to acquire database client for transaction", { + error, + }); + throw new Error("Database connection failed"); + } - try { - await client.query("BEGIN"); - logger.debug("Database transaction started"); + if (!client) { + throw new Error("Database client is undefined"); + } + + try { + await client.query("BEGIN"); + logger.debug("Database transaction started"); + + const result = await operations(client); - const result = await operations(client); + await client.query("COMMIT"); + logger.debug("Database transaction committed"); - await client.query("COMMIT"); - logger.debug("Database transaction committed"); + return result; + } catch (error: any) { + try { + await client.query("ROLLBACK"); + } catch (rollbackError) { + logger.error("Failed to rollback transaction", { rollbackError }); + } - return result; - } catch (error) { - await client.query("ROLLBACK"); - logger.error("Database transaction rolled back due to error:", error); - throw error; - } finally { - client.release(); + const isTransient = TRANSIENT_ERROR_CODES.has(error?.code); + if (isTransient && attempt < maxRetries) { + const delay = baseDelayMs * 2 ** attempt; + attempt++; + logger.warn( + `Transient DB error in transaction (${error.code}). ` + + `Retrying in ${delay}ms (attempt ${attempt}/${maxRetries})`, + ); + await new Promise((resolve) => setTimeout(resolve, delay)); + continue; + } + + logger.error("Database transaction rolled back due to error:", error); + throw error; + } finally { + client.release(); + } } } diff --git a/src/services/databaseService.ts b/src/services/databaseService.ts index d0f6dba..5941f5a 100644 --- a/src/services/databaseService.ts +++ b/src/services/databaseService.ts @@ -1,5 +1,6 @@ -import { query, getClient } from "../db/connection.js"; +import { query } from "../db/connection.js"; import type { PoolClient } from "pg"; +import { withTransaction } from "../db/transaction.js"; export interface UserProfile { id: number; @@ -406,21 +407,14 @@ export class IndexedEventsService { } export class DatabaseService { + /** + * Delegate to the canonical withTransaction from connection.ts, + * which provides transient-error retry with exponential backoff. + */ static async withTransaction( callback: (client: PoolClient) => Promise, ): Promise { - const client = await getClient(); - try { - await client.query("BEGIN"); - const result = await callback(client); - await client.query("COMMIT"); - return result; - } catch (error) { - await client.query("ROLLBACK"); - throw error; - } finally { - client.release(); - } + return withTransaction(callback); } static async healthCheck(): Promise { diff --git a/src/services/eventIndexer.ts b/src/services/eventIndexer.ts index 2e9ccde..76987f3 100644 --- a/src/services/eventIndexer.ts +++ b/src/services/eventIndexer.ts @@ -4,6 +4,7 @@ import { query, withTransaction, getClient, + pool, } from "../db/connection.js"; import logger from "../utils/logger.js"; import { @@ -25,7 +26,7 @@ import { sorobanService } from "./sorobanService.js"; import { updateUserScoresBulk } from "./scoresService.js"; import { AppError } from "../errors/AppError.js"; -const EVENT_TYPE_ALIASES: Record = { +const EVENT_TYPE_ALIASES: Record = { Mint: "NFTMinted", AdmRemint: "NFTMinted", ScoreUpd: "ScoreUpdated", @@ -84,10 +85,6 @@ interface ProcessChunkResult { } export class EventIndexer { - // Stable advisory lock key reserved for the event-indexer poll cycle. - // Chosen to be unique within this database; change only with a migration. - private static readonly ADVISORY_LOCK_KEY = 738_154_291; - private readonly rpc: SorobanRpc.Server; private readonly contractIds: string[]; private readonly pollIntervalMs: number; @@ -244,60 +241,27 @@ export class EventIndexer { private async pollOnce(): Promise { if (!this.running) return; - // Acquire a session-level Postgres advisory lock so that only one instance - // advances the cursor at a time. pg_try_advisory_lock returns false - // immediately when another session already holds the lock, so this instance - // skips the cycle rather than blocking. - const lockClient = await getClient(); - try { - const lockResult = await lockClient.query<{ acquired: boolean }>( - "SELECT pg_try_advisory_lock($1) AS acquired", - [EventIndexer.ADVISORY_LOCK_KEY], - ); + const lastIndexedLedger = await this.getLastIndexedLedger(); + const latestLedger = await this.getLatestLedgerSequence(); - if (!lockResult.rows[0]?.acquired) { - logger.debug( - "Indexer poll skipped — another instance holds the advisory lock", - ); - return; - } - - logger.debug("Indexer advisory lock acquired — starting poll cycle"); - - try { - const lastIndexedLedger = await this.getLastIndexedLedger(); - const latestLedger = await this.getLatestLedgerSequence(); - - if (latestLedger <= lastIndexedLedger) { - return; - } + if (latestLedger <= lastIndexedLedger) { + return; + } - const fromLedger = lastIndexedLedger + 1; - const toLedger = Math.min( - fromLedger + this.batchSize - 1, - latestLedger, - ); + const fromLedger = lastIndexedLedger + 1; + const toLedger = Math.min(fromLedger + this.batchSize - 1, latestLedger); - const result = await this.processChunk(fromLedger, toLedger); - await this.updateLastIndexedLedger(result.lastProcessedLedger); - } finally { - // Always release the advisory lock on the same connection that acquired it. - await lockClient.query("SELECT pg_advisory_unlock($1)", [ - EventIndexer.ADVISORY_LOCK_KEY, - ]); - } - } finally { - lockClient.release(); - } + const result = await this.processChunk(fromLedger, toLedger); + await this.updateLastIndexedLedger(result.lastProcessedLedger); } private async getLatestLedgerSequence(): Promise { try { const latest = (await ( this.rpc as unknown as { - getLatestLedger: () => Promise>; + getLatestLedger: () => Promise>; } - ).getLatestLedger()) as Record; + ).getLatestLedger()) as Record; const candidate = latest.sequence ?? latest.sequenceNumber ?? latest.seq ?? latest.id; diff --git a/src/services/remittanceService.ts b/src/services/remittanceService.ts index 95476f2..263ee1b 100644 --- a/src/services/remittanceService.ts +++ b/src/services/remittanceService.ts @@ -8,7 +8,7 @@ import { } from "@stellar/stellar-sdk"; import { getStellarNetworkPassphrase } from "../config/stellar.js"; import { query } from "../db/connection.js"; -import { withTransaction } from "../db/transaction.js"; +import { withTransaction } from "../db/connection.js"; import { AppError } from "../errors/AppError.js"; import logger from "../utils/logger.js"; @@ -129,7 +129,7 @@ export const remittanceService = { return await withTransaction(async (client) => { const result = await client.query( `INSERT INTO remittances - (id, sender_id, recipient_address, amount, from_currency, to_currency, memo, status, xdr, created_at, updated_at) + (id, sender_id, recipient_address, amount, from_currency, to_currency, memo, status, xdr, created_at, updated_at) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) RETURNING *`, [ @@ -207,7 +207,7 @@ export const remittanceService = { } const result = await query( - `SELECT * FROM remittances + `SELECT * FROM remittances WHERE ${whereClause} ORDER BY created_at DESC, id DESC LIMIT $${params.length + 1}`, @@ -303,7 +303,7 @@ export const remittanceService = { ): Promise { try { const result = await query( - `UPDATE remittances + `UPDATE remittances SET status = $1, transaction_hash = $2, error_message = $3, updated_at = $4 WHERE id = $5 RETURNING *`,