Store transaction inputs/outputs address as string#1116
Store transaction inputs/outputs address as string#1116Klakurka merged 8 commits intoPayButton:masterfrom
Conversation
This release some constraints on the database and improve the performances.
|
Warning Rate limit exceeded
⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. 📒 Files selected for processing (6)
📝 WalkthroughWalkthroughRefactors address storage by denormalizing addresses into string columns on TransactionInput and TransactionOutput, adding a migration to backfill and remove the old FK/indexes and orphaned Address rows; updates Prisma schema, service logic (chronikService/transactionService), batching constants, job sequencing, and related tests to use flat address strings. Changes
Sequence Diagram(s)sequenceDiagram
participant Chronik as ChronikService
participant TxSvc as TransactionService
participant DB as Database (Prisma)
participant Worker as BlockchainWorker
participant Prices as PricesSyncJob
Chronik->>TxSvc: Parse chronik txs -> produce transactions with flat address strings
TxSvc->>DB: Bulk create transactions
TxSvc->>DB: Bulk create transaction inputs/outputs (address strings)
DB-->>TxSvc: Created transaction IDs
Chronik->>Worker: Trigger blockchain sync
Worker->>Chronik: Pull blocks/txs
Worker-->>Prices: onComplete callback -> enqueue prices sync
Prices->>DB: Connect prices to created transactions
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (2 warnings)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
For the chronik calls, and the db batch sizes. This avoids too long db transactions from impacting the performances and allow to start working faster on the chronik transactions.
This avoid initiating a DB transaction for each tx, and favor bulk operations instead. Since it also removes the batch processing this removes the connection limits to the db issue entirely when an address has too many txs.
This is a 2-step optimization: - remove unneeded async in satoshisToUnit() callsites, this reduce locks - optimize the inputs and outputs processing by avoiding looping several times, avoid copying arrays and doing redundant formatting/checks. This also fixes a logging error and a loop exit condition that could potentially cause an infinite loop. Inspired by PayButton@6298e84 and PayButton@c4cb339. This is a ~500x improvement on my machine.
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
jobs/workers.ts (1)
49-58:⚠️ Potential issue | 🟡 MinorUnhandled rejection risk in
onCompletecallback.The
onComplete?.()call is inside avoid (async () => {...})()IIFE. IfonCompletethrows or rejects, the error will become an unhandled promise rejection with no logging.🛡️ Proposed fix to add error handling
worker.on('completed', (job) => { // teardown void (async () => { console.log('Cleaning up MultiBlockchainClient global instance...') await multiBlockchainClient.destroy() console.log('Done.') console.log(`job ${job.id as string}: blockchain + prices sync finished`) - await onComplete?.() + try { + await onComplete?.() + } catch (err: any) { + console.error(`onComplete callback failed: ${err.message as string}`) + } })() })🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@jobs/workers.ts` around lines 49 - 58, The IIFE passed to worker.on('completed') awaits multiBlockchainClient.destroy() but calls onComplete?.() without error handling, risking unhandled promise rejections; update the async IIFE in the worker.on('completed') handler to await onComplete() inside a try/catch (or .catch) block, e.g., after awaiting multiBlockchainClient.destroy() call await onComplete?.() within try { ... } catch (err) { console.error('onComplete failed', err) } so any thrown/rejected error from onComplete is caught and logged.
🧹 Nitpick comments (4)
services/chronikService.ts (1)
529-573: Consider extracting shared logic betweengetSortedInputAddressesandgetSortedOutputAddresses.These two methods are nearly identical - they differ only in iterating over
transaction.inputsvstransaction.outputs. This could be consolidated into a helper function to reduce duplication.♻️ Example refactor
private getSortedAddresses( items: Array<{ outputScript?: string, sats: bigint }>, ): Array<{ address: string, index: number, amount: Prisma.Decimal }> { const addressSatsMap = new Map<string, bigint>() items.forEach((item) => { const address = outputScriptToAddress(this.networkSlug, item.outputScript) if (address !== undefined && address !== '') { const currentValue = addressSatsMap.get(address) ?? 0n addressSatsMap.set(address, currentValue + item.sats) } }) const unitDivisor = this.networkId === XEC_NETWORK_ID ? 1e2 : (this.networkId === BCH_NETWORK_ID ? 1e8 : 1) const result: Array<{ address: string, index: number, amount: Prisma.Decimal }> = [] let index = 0 for (const [address, sats] of addressSatsMap.entries()) { const decimal = new Prisma.Decimal(sats.toString()) const amount = decimal.dividedBy(unitDivisor) result.push({ address, index, amount }) index++ } return result } private getSortedInputAddresses(transaction: Tx) { return this.getSortedAddresses(transaction.inputs) } private getSortedOutputAddresses(transaction: Tx) { return this.getSortedAddresses(transaction.outputs) }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@services/chronikService.ts` around lines 529 - 573, getSortedInputAddresses and getSortedOutputAddresses duplicate the same logic; extract the common behavior into a helper method (e.g., getSortedAddresses) that accepts items: Array<{ outputScript?: string, sats: bigint }>, moves the Map aggregation, unitDivisor calculation, Prisma.Decimal conversion and result construction there, and then have getSortedInputAddresses(transaction: Tx) and getSortedOutputAddresses(transaction: Tx) simply call this.getSortedAddresses(transaction.inputs) and this.getSortedAddresses(transaction.outputs) respectively; ensure you reuse this.networkSlug and this.networkId in the helper and preserve the return type Array<{address: string, index: number, amount: Prisma.Decimal}>.constants/index.ts (1)
280-292: LGTM on batch size reductions; minor i18n note.The reduced concurrency (128→16) and batch sizes (2000→200) align well with the PR goal of avoiding long DB transactions. The new constants for prices connection are appropriately sized.
Minor: Line 282 has a Portuguese comment (
// tamanho dos lotes para commit no DB). Consider using English for consistency.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@constants/index.ts` around lines 280 - 292, The comment for DB_COMMIT_BATCH_SIZE contains Portuguese text; update the inline comment next to the DB_COMMIT_BATCH_SIZE constant to English for consistency (e.g., replace "// tamanho dos lotes para commit no DB" with an English description like "// batch size for DB commits") so the codebase uses a single language for comments and is easier for all contributors to read.services/transactionService.ts (2)
749-764: Consider batching updates for very large sets.The updates use
Promise.allon individual update calls. For extremely large batches of updates, this could potentially hit connection pool limits. However, given that updates only occur whenconfirmed,timestamp, ororphanedchange, this is likely a small subset in practice.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@services/transactionService.ts` around lines 749 - 764, The current code maps toUpdate into many concurrent prisma.transaction.update calls and awaits Promise.all, which can exhaust DB connections for very large toUpdate arrays; change this to process updates in controlled chunks (e.g., chunk size 25–100) and for each chunk run a prisma.$transaction of the chunk's update operations (or sequentially await each update in the chunk) to limit concurrency; update the logic around the toUpdate.map/Promise.all block (referencing toUpdate, prisma.transaction.update, includeNetwork, and updatedTransactions) to collect results from each chunk and push them into updatedTransactions.
684-696: Minor performance improvement possible: O(n²) lookup in loop.Line 688 uses
createdTxs.find()inside a loop overnewTxs, resulting in O(n*m) complexity. For large batches, this could be slow.♻️ Proposed optimization using a Map
// Create a map to match transactions with their inputs/outputs - const txMap = new Map<string, { tx: typeof createdTxs[0], inputs: typeof txInputsOutputs[0]['inputs'], outputs: typeof txInputsOutputs[0]['outputs'] }>() + // Build a lookup map for created transactions + const createdTxMap = new Map<string, typeof createdTxs[0]>() + for (const ct of createdTxs) { + createdTxMap.set(`${ct.hash}:${ct.addressId}`, ct) + } + + const txMap = new Map<string, { tx: typeof createdTxs[0], inputs: typeof txInputsOutputs[0]['inputs'], outputs: typeof txInputsOutputs[0]['outputs'] }>() for (let i = 0; i < newTxs.length; i++) { const tx = newTxs[i] - const created = createdTxs.find(ct => ct.hash === tx.hash && ct.addressId === tx.addressId) + const created = createdTxMap.get(`${tx.hash}:${tx.addressId}`) if (created != null) { txMap.set(`${tx.hash}:${tx.addressId}`, { tx: created as any, inputs: newTxsInputsOutputs[i].inputs, outputs: newTxsInputsOutputs[i].outputs }) } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@services/transactionService.ts` around lines 684 - 696, Replace the O(n*m) createdTxs.find() inside the newTxs loop with a prebuilt lookup: create a Map keyed by `${ct.hash}:${ct.addressId}` from createdTxs before iterating newTxs, then inside the loop use that map to retrieve the matching created entry and set txMap; update the code around txMap, createdTxs, newTxs, newTxsInputsOutputs, tx.hash and tx.addressId to use the constant-time lookup and keep existing shape/types for tx, inputs and outputs.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@jobs/initJobs.ts`:
- Around line 28-41: The call to syncBlockchainAndPricesWorker uses the
redundant "void await" pattern; remove the "await" and use "void
syncBlockchainAndPricesWorker(...)" to make it a proper fire-and-forget call
(leave the rest of the block that enqueues syncCurrentPricesWorker and starts
the job unchanged), referencing the syncBlockchainAndPricesWorker invocation so
the change is applied to that exact call site.
---
Outside diff comments:
In `@jobs/workers.ts`:
- Around line 49-58: The IIFE passed to worker.on('completed') awaits
multiBlockchainClient.destroy() but calls onComplete?.() without error handling,
risking unhandled promise rejections; update the async IIFE in the
worker.on('completed') handler to await onComplete() inside a try/catch (or
.catch) block, e.g., after awaiting multiBlockchainClient.destroy() call await
onComplete?.() within try { ... } catch (err) { console.error('onComplete
failed', err) } so any thrown/rejected error from onComplete is caught and
logged.
---
Nitpick comments:
In `@constants/index.ts`:
- Around line 280-292: The comment for DB_COMMIT_BATCH_SIZE contains Portuguese
text; update the inline comment next to the DB_COMMIT_BATCH_SIZE constant to
English for consistency (e.g., replace "// tamanho dos lotes para commit no DB"
with an English description like "// batch size for DB commits") so the codebase
uses a single language for comments and is easier for all contributors to read.
In `@services/chronikService.ts`:
- Around line 529-573: getSortedInputAddresses and getSortedOutputAddresses
duplicate the same logic; extract the common behavior into a helper method
(e.g., getSortedAddresses) that accepts items: Array<{ outputScript?: string,
sats: bigint }>, moves the Map aggregation, unitDivisor calculation,
Prisma.Decimal conversion and result construction there, and then have
getSortedInputAddresses(transaction: Tx) and
getSortedOutputAddresses(transaction: Tx) simply call
this.getSortedAddresses(transaction.inputs) and
this.getSortedAddresses(transaction.outputs) respectively; ensure you reuse
this.networkSlug and this.networkId in the helper and preserve the return type
Array<{address: string, index: number, amount: Prisma.Decimal}>.
In `@services/transactionService.ts`:
- Around line 749-764: The current code maps toUpdate into many concurrent
prisma.transaction.update calls and awaits Promise.all, which can exhaust DB
connections for very large toUpdate arrays; change this to process updates in
controlled chunks (e.g., chunk size 25–100) and for each chunk run a
prisma.$transaction of the chunk's update operations (or sequentially await each
update in the chunk) to limit concurrency; update the logic around the
toUpdate.map/Promise.all block (referencing toUpdate, prisma.transaction.update,
includeNetwork, and updatedTransactions) to collect results from each chunk and
push them into updatedTransactions.
- Around line 684-696: Replace the O(n*m) createdTxs.find() inside the newTxs
loop with a prebuilt lookup: create a Map keyed by `${ct.hash}:${ct.addressId}`
from createdTxs before iterating newTxs, then inside the loop use that map to
retrieve the matching created entry and set txMap; update the code around txMap,
createdTxs, newTxs, newTxsInputsOutputs, tx.hash and tx.addressId to use the
constant-time lookup and keep existing shape/types for tx, inputs and outputs.
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (7)
constants/index.tsjobs/initJobs.tsjobs/workers.tspages/api/address/balance/[address].tsservices/chronikService.tsservices/transactionService.tsutils/index.ts
✅ Files skipped from review due to trivial changes (1)
- pages/api/address/balance/[address].ts
| void await syncBlockchainAndPricesWorker(blockchainQueue.name, async () => { | ||
| await pricesQueue.add('syncCurrentPrices', | ||
| {}, | ||
| { | ||
| jobId: 'syncCurrentPrices', | ||
| removeOnFail: false, | ||
| repeat: { | ||
| every: CURRENT_PRICE_REPEAT_DELAY | ||
| } | ||
| } | ||
| ) | ||
| await syncCurrentPricesWorker(pricesQueue.name) | ||
| console.log('Current prices sync job started after blockchain sync completion.') | ||
| }) |
There was a problem hiding this comment.
Redundant void await pattern.
Line 28 uses void await syncBlockchainAndPricesWorker(...) which is contradictory - void discards the value while await waits for the promise. Since the worker sets up event handlers and returns, this should either be await (to ensure setup completes before continuing) or void (fire-and-forget), but not both.
Given the current code flow where cleanupQueue.add runs immediately after, this appears intentional as fire-and-forget. If so, remove the await.
🔧 Proposed fix
- void await syncBlockchainAndPricesWorker(blockchainQueue.name, async () => {
+ void syncBlockchainAndPricesWorker(blockchainQueue.name, async () => {📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| void await syncBlockchainAndPricesWorker(blockchainQueue.name, async () => { | |
| await pricesQueue.add('syncCurrentPrices', | |
| {}, | |
| { | |
| jobId: 'syncCurrentPrices', | |
| removeOnFail: false, | |
| repeat: { | |
| every: CURRENT_PRICE_REPEAT_DELAY | |
| } | |
| } | |
| ) | |
| await syncCurrentPricesWorker(pricesQueue.name) | |
| console.log('Current prices sync job started after blockchain sync completion.') | |
| }) | |
| void syncBlockchainAndPricesWorker(blockchainQueue.name, async () => { | |
| await pricesQueue.add('syncCurrentPrices', | |
| {}, | |
| { | |
| jobId: 'syncCurrentPrices', | |
| removeOnFail: false, | |
| repeat: { | |
| every: CURRENT_PRICE_REPEAT_DELAY | |
| } | |
| } | |
| ) | |
| await syncCurrentPricesWorker(pricesQueue.name) | |
| console.log('Current prices sync job started after blockchain sync completion.') | |
| }) |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@jobs/initJobs.ts` around lines 28 - 41, The call to
syncBlockchainAndPricesWorker uses the redundant "void await" pattern; remove
the "await" and use "void syncBlockchainAndPricesWorker(...)" to make it a
proper fire-and-forget call (leave the rest of the block that enqueues
syncCurrentPricesWorker and starts the job unchanged), referencing the
syncBlockchainAndPricesWorker invocation so the change is applied to that exact
call site.
It's unclear what the use case is and it was added in PayButton#1110 to avoid having to do it later. It appears that this table is costly as it contains a lot of outputs. Let's remove it for now to favor disk size and performance. We can add it back later if we need it.
This release some constraints on the database and improve the performances.
Related to #1110.
Note that it removes entries from the db so should be applied with caution (backups).
Summary by CodeRabbit