Skip to content
Merged
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,4 @@ redis/dump.rdb
prisma/seeds/productionTxs.csv
paybutton-config.json

dump.sql
dump.sql*
2 changes: 1 addition & 1 deletion constants/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -279,5 +279,5 @@ export const XEC_TX_EXPLORER_URL = 'https://explorer.e.cash/tx/'
export const BCH_TX_EXPLORER_URL = 'https://blockchair.com/bitcoin-cash/transaction/'

export const MAX_MEMPOOL_TXS_TO_PROCESS_AT_A_TIME = 2
export const CHRONIK_INITIALIZATION_DELAY = 500
export const CHRONIK_INITIALIZATION_DELAY = 2000
export const MEMPOOL_PROCESS_DELAY = 100
2 changes: 1 addition & 1 deletion docker-compose-from-dump.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ services:
depends_on:
- db
- cache
image: registry.supertokens.io/supertokens/supertokens-mysql
image: registry.supertokens.io/supertokens/supertokens-mysql:10.0.0
restart: always
ports:
- 3567:3567
Expand Down
2 changes: 1 addition & 1 deletion docker-compose-prod.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ services:
container_name: paybutton-users-service
depends_on:
- cache
image: registry.supertokens.io/supertokens/supertokens-mysql
image: registry.supertokens.io/supertokens/supertokens-mysql:10.0.0
restart: always
ports:
- 3567:3567
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ services:
depends_on:
- db
- cache
image: registry.supertokens.io/supertokens/supertokens-mysql
image: registry.supertokens.io/supertokens/supertokens-mysql:10.0.0
restart: always
ports:
- 3567:3567
Expand Down
13 changes: 13 additions & 0 deletions services/addressService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,19 @@ export async function fetchAddressById (addressId: string, includePaybuttons = f
return result
}

export async function setSyncingBatch (addressStringArray: string[], syncing: boolean): Promise<void> {
await prisma.address.updateMany({
where: {
address: {
in: addressStringArray
}
},
data: {
syncing
}
})
}

export async function setSyncing (addressString: string, syncing: boolean): Promise<void> {
const result = await prisma.address.update({
where: {
Expand Down
51 changes: 33 additions & 18 deletions services/chronikService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import {
import { Address, Prisma } from '@prisma/client'
import xecaddr from 'xecaddrjs'
import { getAddressPrefix, satoshisToUnit } from 'utils/index'
import { fetchAddressBySubstring, fetchAddressesArray, fetchAllAddressesForNetworkId, getEarliestUnconfirmedTxTimestampForAddress, getLatestConfirmedTxTimestampForAddress, setSyncing, updateLastSynced } from './addressService'
import { fetchAddressesArray, fetchAllAddressesForNetworkId, getEarliestUnconfirmedTxTimestampForAddress, getLatestConfirmedTxTimestampForAddress, setSyncing, setSyncingBatch, updateLastSynced } from './addressService'
import * as ws from 'ws'
import { BroadcastTxData } from 'ws-service/types'
import config from 'config'
Expand Down Expand Up @@ -284,17 +284,14 @@ export class ChronikBlockchainClient {
return (await this.chronik.script(type, hash160).history(page, pageSize)).txs
}

public async * syncTransactionsForAddress (addressString: string, fully = false, runTriggers = false): AsyncGenerator<TransactionWithAddressAndPrices[]> {
const address = await fetchAddressBySubstring(addressString)
if (address.syncing) { return }
await setSyncing(addressString, true)
public async * syncTransactionsForAddress (address: Address, fully = false, runTriggers = false): AsyncGenerator<TransactionWithAddressAndPrices[]> {
const pageSize = FETCH_N
let page = 0
const earliestUnconfirmedTxTimestamp = await getEarliestUnconfirmedTxTimestampForAddress(address.id)
const latestTimestamp = earliestUnconfirmedTxTimestamp ?? await getLatestConfirmedTxTimestampForAddress(address.id) ?? 0

while (true) {
let transactions = await this.getPaginatedTxs(addressString, page, pageSize)
let transactions = await this.getPaginatedTxs(address.address, page, pageSize)

// filter out transactions that happened before a certain date set in constants/index,
// this date is understood as the beginning and we don't look past it
Expand All @@ -318,11 +315,11 @@ export class ChronikBlockchainClient {
if (persistedTransactions.length > 0) {
const simplifiedTransactions = getSimplifiedTransactions(persistedTransactions)

console.log(`${this.CHRONIK_MSG_PREFIX}: added ${simplifiedTransactions.length} txs to ${addressString}`)
console.log(`${this.CHRONIK_MSG_PREFIX}: added ${simplifiedTransactions.length} txs to ${address.address}`)

const broadcastTxData: BroadcastTxData = {} as BroadcastTxData
broadcastTxData.messageType = 'OldTx'
broadcastTxData.address = addressString
broadcastTxData.address = address.address
broadcastTxData.txs = simplifiedTransactions

this.wsEndpoint.emit(SOCKET_MESSAGES.TXS_BROADCAST, broadcastTxData)
Expand All @@ -335,8 +332,8 @@ export class ChronikBlockchainClient {

await new Promise(resolve => setTimeout(resolve, FETCH_DELAY))
}
await setSyncing(addressString, false)
await updateLastSynced(addressString)
await setSyncing(address.address, false)
await updateLastSynced(address.address)
}

private async getUtxos (address: string): Promise<ScriptUtxo[]> {
Expand Down Expand Up @@ -408,12 +405,22 @@ export class ChronikBlockchainClient {
return sortedInputAddresses
}

public async waitForSyncing (txId: string, addressStringArray: string[]): Promise<void> {
if (!this.initializing) return
console.log(`${this.CHRONIK_MSG_PREFIX}: Waiting unblocking addresses for ${txId}`)
while (true) {
const addresses = await fetchAddressesArray(addressStringArray)
if (addresses.every(a => !a.syncing)) {
console.log(`${this.CHRONIK_MSG_PREFIX}: Finished unblocking addresses for ${txId}`)
return
}
await new Promise(resolve => setTimeout(resolve, CHRONIK_INITIALIZATION_DELAY))
}
}

private async processWsMessage (msg: WsMsgClient): Promise<void> {
// delete unconfirmed transaction from our database
// if they were cancelled and not confirmed
while (this.initializing) {
await new Promise(resolve => setTimeout(resolve, CHRONIK_INITIALIZATION_DELAY))
}
if (msg.type === 'Tx') {
if (msg.msgType === 'TX_REMOVED_FROM_MEMPOOL') {
console.log(`${this.CHRONIK_MSG_PREFIX}: [${msg.msgType}] ${msg.txid}`)
Expand All @@ -433,13 +440,14 @@ export class ChronikBlockchainClient {
if (this.isAlreadyBeingProcessed(msg.txid, false)) {
return
}
while (this.mempoolTxsBeingProcessed > MAX_MEMPOOL_TXS_TO_PROCESS_AT_A_TIME) {
while (this.mempoolTxsBeingProcessed >= MAX_MEMPOOL_TXS_TO_PROCESS_AT_A_TIME) {
Copy link

Copilot AI Aug 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The condition changed from > to >= will cause the system to wait when exactly MAX_MEMPOOL_TXS_TO_PROCESS_AT_A_TIME transactions are being processed, potentially reducing throughput unnecessarily. The original > condition was correct as it allows processing up to the maximum limit.

Suggested change
while (this.mempoolTxsBeingProcessed >= MAX_MEMPOOL_TXS_TO_PROCESS_AT_A_TIME) {
while (this.mempoolTxsBeingProcessed > MAX_MEMPOOL_TXS_TO_PROCESS_AT_A_TIME) {

Copilot uses AI. Check for mistakes.
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this.mempoolTxsBeingProcessed === MAX_MEMPOOL_TXS_TO_PROCESS_AT_A_TIME, it means we already have the max being processed, so it should keep waiting.

await new Promise(resolve => setTimeout(resolve, MEMPOOL_PROCESS_DELAY))
}
this.mempoolTxsBeingProcessed += 1
console.log(`${this.CHRONIK_MSG_PREFIX}: [${msg.msgType}] ${msg.txid}`)
const transaction = await this.chronik.tx(msg.txid)
const addressesWithTransactions = await this.getAddressesForTransaction(transaction)
await this.waitForSyncing(msg.txid, addressesWithTransactions.map(obj => obj.address.address))
const inputAddresses = this.getSortedInputAddresses(transaction)
for (const addressWithTransaction of addressesWithTransactions) {
const { created, tx } = await upsertTransaction(addressWithTransaction.transaction)
Expand All @@ -456,6 +464,9 @@ export class ChronikBlockchainClient {
console.log(`${this.CHRONIK_MSG_PREFIX}: [${msg.msgType}] Height: ${msg.blockHeight} Hash: ${msg.blockHash}`)
if (msg.msgType === 'BLK_FINALIZED') {
console.log(`${this.CHRONIK_MSG_PREFIX}: [${msg.msgType}] Syncing ${this.confirmedTxsHashesFromLastBlock.length} txs on the block...`)
while (this.initializing) {
await new Promise(resolve => setTimeout(resolve, CHRONIK_INITIALIZATION_DELAY))
}
await this.syncBlockTransactions(msg.blockHash)
console.log(`${this.CHRONIK_MSG_PREFIX}: [${msg.msgType}] Syncing done.`)
this.confirmedTxsHashesFromLastBlock = []
Expand Down Expand Up @@ -567,9 +578,10 @@ export class ChronikBlockchainClient {
}
}
console.log(`${this.CHRONIK_MSG_PREFIX} Syncing ${addresses.length} addresses...`)
await setSyncingBatch(addresses.map(a => a.address), true)
for (const addr of addresses) {
try {
const generator = this.syncTransactionsForAddress(addr.address, false, runTriggers)
const generator = this.syncTransactionsForAddress(addr, false, runTriggers)
let count = 0
while (true) {
const result = await generator.next()
Expand All @@ -593,7 +605,10 @@ export class ChronikBlockchainClient {
}
}
const failedAddresses = Object.keys(failedAddressesWithErrors)
console.log(`${this.CHRONIK_MSG_PREFIX} Finished syncing ${addresses.length} addresses with ${failedAddresses.length} errors.`)
const totalSyncedTxsArray = Object.values(successfulAddressesWithCount)
const totalSyncedTxsCount = totalSyncedTxsArray.reduce((prev, curr) => prev + curr, 0)

console.log(`${this.CHRONIK_MSG_PREFIX} Finished syncing ${totalSyncedTxsCount} txs for ${addresses.length} addresses with ${failedAddresses.length} errors.`)
if (failedAddresses.length > 0) {
console.log(`${this.CHRONIK_MSG_PREFIX} Failed addresses were:\n- ${Object.entries(failedAddressesWithErrors).map((kv: [string, string]) => `${kv[0]}: ${kv[1]}`).join('\n- ')}`)
}
Expand Down Expand Up @@ -704,13 +719,13 @@ class MultiBlockchainClient {
void (async () => {
if (this.isRunningApp()) {
await syncPastDaysNewerPrices()
await connectAllTransactionsToPrices()
const asyncOperations: Array<Promise<void>> = []
this.clients = {
ecash: this.instantiateChronikClient('ecash', asyncOperations),
bitcoincash: this.instantiateChronikClient('bitcoincash', asyncOperations)
}
await Promise.all(asyncOperations)
this.initializing = false
await connectAllTransactionsToPrices()
Comment on lines +728 to 729
Copy link

Copilot AI Aug 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Setting this.initializing = false before calling connectAllTransactionsToPrices() could cause race conditions. If WebSocket messages arrive after initialization is marked complete but before price connections finish, they may not wait for address syncing to complete properly.

Suggested change
this.initializing = false
await connectAllTransactionsToPrices()
await connectAllTransactionsToPrices()
this.initializing = false

Copilot uses AI. Check for mistakes.
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see the reason to wait missed txs to be synced with prices to processes txs that have arrived during the sync. Only reason could be too many DB connections, as far as I can see, which I don't think is reason enough, but I could be wrong

this.clients.ecash.setInitialized()
this.clients.bitcoincash.setInitialized()
Expand All @@ -721,8 +736,8 @@ class MultiBlockchainClient {
bitcoincash: this.instantiateChronikClient('bitcoincash', asyncOperations)
}
await Promise.all(asyncOperations)
this.initializing = false
}
this.initializing = false
})()
}

Expand Down
5 changes: 4 additions & 1 deletion services/transactionService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -461,13 +461,16 @@ export async function connectTransactionsListToPrices (txList: Transaction[]): P
}

export async function connectAllTransactionsToPrices (): Promise<void> {
console.log('[PRICES] Started connecting txs to prices.')
const noPricesTxs = await fetchAllTransactionsWithNoPrices()
console.log(`[PRICES] Found ${noPricesTxs.length} txs with no prices.`)
const wrongNumberOfPricesTxs = await fetchAllTransactionsWithIrregularPrices()
console.log(`[PRICES] Found ${wrongNumberOfPricesTxs.length} txs with irregular prices.`)
const txs = [
...noPricesTxs,
...wrongNumberOfPricesTxs
]
console.log(`[PRICES] Connecting ${noPricesTxs.length} txs with no prices and ${wrongNumberOfPricesTxs.length} with irregular prices...`)
console.log('[PRICES] Connecting txs to prices...')
void await connectTransactionsListToPrices(txs)
console.log('[PRICES] Finished connecting txs to prices.')
}
Expand Down
4 changes: 3 additions & 1 deletion tests/unittests/chronikService.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {
ChronikBlockchainClient,
multiBlockchainClient
} from '../../services/chronikService'
import { Address } from '@prisma/client'

// Mock the heavy dependencies to avoid network calls in tests
jest.mock('chronik-client-cashtokens', () => ({
Expand Down Expand Up @@ -49,6 +50,7 @@ jest.mock('../../services/addressService', () => ({
getEarliestUnconfirmedTxTimestampForAddress: jest.fn(),
getLatestConfirmedTxTimestampForAddress: jest.fn(),
setSyncing: jest.fn(),
setSyncingBatch: jest.fn(),
updateLastSynced: jest.fn()
}))

Expand Down Expand Up @@ -922,7 +924,7 @@ describe('ChronikBlockchainClient advanced functionality', () => {

// Mock the sync generator method
const mockSyncGenerator = {
async *syncTransactionsForAddress(address: string) {
async *syncTransactionsForAddress(address: Address) {
yield []
return
}
Expand Down