Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 75 additions & 0 deletions docs/database-transactions.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# Database Transactions

## Quick Reference

| Helper | Retries? | Use For |
| ------------------------ | ---------------------------- | ------------------------------------------------------------------------- |
| `withTransaction` | ✅ Yes (exponential backoff) | **All money-moving code** — loans, repayments, transfers, balance updates |
| `withTransactionNoRetry` | ❌ No | Read-only queries, idempotent admin scripts, externally-managed retry |

## Import

```typescript
import { withTransaction, withTransactionNoRetry } from "../db/transaction";

Why Two Helpers?
PostgreSQL (and other MVCC databases) can raise transient errors under concurrency:
40001 serialization_failure — concurrent transactions conflict on row versions
40P01 deadlock_detected — circular lock dependency between transactions
These are expected under load and safe to retry — the transaction had not yet committed.
withTransaction automatically retries with exponential backoff (50ms → 100ms → 200ms … max 2s, with jitter).
withTransactionNoRetry skips this overhead for paths where it adds no value.

Examples
Money-moving: use retrying variant
import { withTransaction } from "../db/transaction";

async function disburseLoan(loanId: string, amount: BigNumber) {
const client = await pool.connect();
try {
return await withTransaction(client, async (tx) => {
// Deduct from lender escrow
await tx.query(
"UPDATE escrow_balances SET balance = balance - $1 WHERE id = $2",
[amount, lenderId]
);

// Credit borrower wallet
await tx.query(
"UPDATE wallet_balances SET balance = balance + $1 WHERE id = $2",
[amount, borrowerId]
);

// Mark loan disbursed
await tx.query(
"UPDATE loans SET status = 'disbursed', disbursed_at = NOW() WHERE id = $1",
[loanId]
);

return { disbursed: true };
});
} finally {
client.release();
}
}


Read-only: use no-retry variant (optional)
import { withTransactionNoRetry } from "../db/transaction";

async function getLoanHistory(userId: string) {
const client = await pool.connect();
try {
return await withTransactionNoRetry(client, async (tx) => {
// SET TRANSACTION READ ONLY; -- optional optimization
const { rows } = await tx.query(
"SELECT * FROM loans WHERE borrower_id = $1 ORDER BY created_at DESC",
[userId]
);
return rows;
});
} finally {
client.release();
}
}
```
161 changes: 161 additions & 0 deletions scripts/audit-transaction-imports.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
#!/usr/bin/env ts-node
/**
* Audit script: find all withTransaction imports across the codebase.
*
* Usage:
* npx ts-node scripts/audit-transaction-imports.ts
*
* Outputs a report showing:
* - Files importing from "../db/connection" (should migrate)
* - Files importing from "../db/transaction" (correct)
* - Files using withTransactionNoRetry (verify intentional)
*/

import * as fs from "fs";
import * as path from "path";

const SRC_DIR = path.join(__dirname, "..", "src");

interface ImportMatch {
file: string;
line: number;
text: string;
source: "connection" | "transaction" | "unknown";
usesRetryVariant: boolean;
usesNoRetry: boolean;
}

function findTsFiles(dir: string): string[] {
const files: string[] = [];
for (const entry of fs.readdirSync(dir, { withFileTypes: true })) {
const fullPath = path.join(dir, entry.name);
if (entry.isDirectory()) {
files.push(...findTsFiles(fullPath));
} else if (entry.name.endsWith(".ts") && !entry.name.endsWith(".d.ts")) {
files.push(fullPath);
}
}
return files;
}

function analyzeFile(filePath: string): ImportMatch[] {
const content = fs.readFileSync(filePath, "utf-8");
const lines = content.split("\n");
const matches: ImportMatch[] = [];

for (let i = 0; i < lines.length; i++) {
const line = lines[i];
const importRegex =
/import\s+.*?\{[^}]*\b(withTransaction|withTransactionNoRetry)\b[^}]*\}.*?from\s+['"]([^'"]+)['"]/;
const match = line.match(importRegex);

if (match) {
const sourceModule = match[2];
const source: ImportMatch["source"] = sourceModule.includes("connection")
? "connection"
: sourceModule.includes("transaction")
? "transaction"
: "unknown";

matches.push({
file: path.relative(process.cwd(), filePath),
line: i + 1,
text: line.trim(),
source,
usesRetryVariant: line.includes("withTransaction"),
usesNoRetry: line.includes("withTransactionNoRetry"),
});
}
}

return matches;
}

function main() {
const files = findTsFiles(SRC_DIR);
const allMatches: ImportMatch[] = [];

for (const file of files) {
allMatches.push(...analyzeFile(file));
}

// Categorize
const fromConnection = allMatches.filter((m) => m.source === "connection");
const fromTransaction = allMatches.filter((m) => m.source === "transaction");
const usingNoRetry = allMatches.filter((m) => m.usesNoRetry);

console.log("═══════════════════════════════════════════════════════════");
console.log(" withTransaction Import Audit Report");
console.log("═══════════════════════════════════════════════════════════\n");

console.log(`Total imports found: ${allMatches.length}\n`);

if (fromConnection.length > 0) {
console.log(
`⚠️ Imports from connection.ts (NEED MIGRATION): ${fromConnection.length}`,
);
console.log(
" These should be updated to import from '../db/transaction'\n",
);
for (const m of fromConnection) {
console.log(` ${m.file}:${m.line}`);
console.log(` → ${m.text}\n`);
}
} else {
console.log("✅ No imports from connection.ts — all migrated!\n");
}

if (fromTransaction.length > 0) {
console.log(
`✅ Imports from transaction.ts (CORRECT): ${fromTransaction.length}\n`,
);
for (const m of fromTransaction) {
console.log(` ${m.file}:${m.line}`);
console.log(` → ${m.text}\n`);
}
}

if (usingNoRetry.length > 0) {
console.log(
`ℹ️ Files using withTransactionNoRetry: ${usingNoRetry.length}`,
);
console.log(" Please verify these are intentionally non-retrying:\n");
for (const m of usingNoRetry) {
console.log(` ${m.file}:${m.line}`);
console.log(` → ${m.text}\n`);
}
}

// Money-moving paths check
const moneyPaths = allMatches.filter(
(m) =>
m.file.toLowerCase().includes("loan") ||
m.file.toLowerCase().includes("payment") ||
m.file.toLowerCase().includes("repay") ||
m.file.toLowerCase().includes("transfer") ||
m.file.toLowerCase().includes("wallet") ||
m.file.toLowerCase().includes("balance"),
);

if (moneyPaths.length > 0) {
console.log("💰 Money-moving paths using withTransaction:");
for (const m of moneyPaths) {
const status = m.usesNoRetry
? "❌ USES NO-RETRY — RISK!"
: "✅ retrying variant";
console.log(` ${m.file}:${m.line} — ${status}`);
}
}

console.log("\n═══════════════════════════════════════════════════════════");
console.log(" Recommended fixes:");
console.log("═══════════════════════════════════════════════════════════");
console.log(
` sed -i 's|from "../db/connection"|from "../db/transaction"|g' src/**/*.ts`,
);
console.log(
" Then verify money-moving paths use withTransaction (not NoRetry).",
);
}

main();
2 changes: 1 addition & 1 deletion src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import dotenv from "dotenv";
import { Sentry } from "./config/sentry.js";

dotenv.config();
import pool from "./db/connection.js";
import { pool } from "./db/connection.js";
import { cacheService } from "./services/cacheService.js";
import { sorobanService } from "./services/sorobanService.js";
import simulationRoutes from "./routes/simulationRoutes.js";
Expand Down
6 changes: 2 additions & 4 deletions src/controllers/loanController.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
import type { Request, Response, NextFunction } from "express";
import { query } from "../db/connection.js";
import {
withTransaction,
withStellarAndDbTransaction,
} from "../db/transaction.js";
import { withTransaction } from "../db/connection.js";
import { withStellarAndDbTransaction } from "../db/transaction.js";
import { AppError } from "../errors/AppError.js";
import { asyncHandler } from "../utils/asyncHandler.js";
import { getLoanConfig } from "../config/loanConfig.js";
Expand Down
1 change: 1 addition & 0 deletions src/controllers/poolController.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { Request, Response } from "express";
import { query } from "../db/connection.js";
import { withTransaction } from "../db/connection.js";
import { withStellarAndDbTransaction } from "../db/transaction.js";
import { AppError } from "../errors/AppError.js";
import { ErrorCode } from "../errors/errorCodes.js";
Expand Down
56 changes: 2 additions & 54 deletions src/db/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import pg, { type PoolClient } from "pg";
import logger from "../utils/logger.js";

export type { PoolClient };
export { withTransaction } from "./transaction.js";

const { Pool } = pg;

Expand All @@ -16,7 +17,7 @@ const idleTimeoutMillis = process.env.DB_IDLE_TIMEOUT_MS
? parseInt(process.env.DB_IDLE_TIMEOUT_MS, 10)
: 30000;

const pool = new Pool({
export const pool = new Pool({
connectionString: process.env.DATABASE_URL,
min: minPoolSize,
max: maxPoolSize,
Expand Down Expand Up @@ -76,59 +77,6 @@ const withRetry = async <T>(
}
};

/**
* Execute `fn` inside a single dedicated database transaction.
*
* A single PoolClient is checked out for the lifetime of the call so that
* BEGIN / all DML / COMMIT all run on the **same** PostgreSQL connection.
* If `fn` throws, or if any transient error is encountered, the transaction
* is rolled back and the error is re-thrown after up to `maxRetries` attempts
* with exponential back-off.
*
* @param fn Callback that receives the pinned client.
* @param maxRetries Number of retry attempts on transient errors (default 3).
* @param baseDelayMs Initial back-off delay in milliseconds (doubles each retry).
*/
export async function withTransaction<T>(
fn: (client: PoolClient) => Promise<T>,
maxRetries = 3,
baseDelayMs = 200,
): Promise<T> {
let attempt = 0;

while (true) {
const client = await getClient();
try {
await client.query("BEGIN");
const result = await fn(client);
await client.query("COMMIT");
return result;
} catch (error: any) {
try {
await client.query("ROLLBACK");
} catch (rollbackError) {
logger.error("Failed to rollback transaction", { rollbackError });
}

const isTransient = TRANSIENT_ERROR_CODES.has(error?.code);
if (isTransient && attempt < maxRetries) {
const delay = baseDelayMs * 2 ** attempt;
attempt++;
logger.warn(
`Transient DB error in transaction (${error.code}). ` +
`Retrying in ${delay}ms (attempt ${attempt}/${maxRetries})`,
);
await new Promise((resolve) => setTimeout(resolve, delay));
continue;
}

throw error;
} finally {
client.release();
}
}
}

const checkExhaustion = () => {
if (pool.totalCount >= maxPoolSize && pool.idleCount === 0) {
logger.warn(
Expand Down
Loading