diff --git a/.gitignore b/.gitignore index 5ebd6fe6..6cf324f2 100644 --- a/.gitignore +++ b/.gitignore @@ -38,4 +38,4 @@ redis/dump.rdb prisma/seeds/productionTxs.csv paybutton-config.json -dump.sql +dump.sql* diff --git a/constants/index.ts b/constants/index.ts index 5c80a592..eb58b79a 100644 --- a/constants/index.ts +++ b/constants/index.ts @@ -279,5 +279,5 @@ export const XEC_TX_EXPLORER_URL = 'https://explorer.e.cash/tx/' export const BCH_TX_EXPLORER_URL = 'https://blockchair.com/bitcoin-cash/transaction/' export const MAX_MEMPOOL_TXS_TO_PROCESS_AT_A_TIME = 2 -export const CHRONIK_INITIALIZATION_DELAY = 500 +export const CHRONIK_INITIALIZATION_DELAY = 2000 export const MEMPOOL_PROCESS_DELAY = 100 diff --git a/docker-compose-from-dump.yml b/docker-compose-from-dump.yml index 17e86a84..f9a5c5f8 100644 --- a/docker-compose-from-dump.yml +++ b/docker-compose-from-dump.yml @@ -48,7 +48,7 @@ services: depends_on: - db - cache - image: registry.supertokens.io/supertokens/supertokens-mysql + image: registry.supertokens.io/supertokens/supertokens-mysql:10.0.0 restart: always ports: - 3567:3567 diff --git a/docker-compose-prod.yml b/docker-compose-prod.yml index 8f8b8f93..bbcdff03 100644 --- a/docker-compose-prod.yml +++ b/docker-compose-prod.yml @@ -18,7 +18,7 @@ services: container_name: paybutton-users-service depends_on: - cache - image: registry.supertokens.io/supertokens/supertokens-mysql + image: registry.supertokens.io/supertokens/supertokens-mysql:10.0.0 restart: always ports: - 3567:3567 diff --git a/docker-compose.yml b/docker-compose.yml index 13f9a868..c75ef9b8 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -44,7 +44,7 @@ services: depends_on: - db - cache - image: registry.supertokens.io/supertokens/supertokens-mysql + image: registry.supertokens.io/supertokens/supertokens-mysql:10.0.0 restart: always ports: - 3567:3567 diff --git a/services/addressService.ts b/services/addressService.ts index 50bac46b..6b930512 100644 --- a/services/addressService.ts +++ b/services/addressService.ts @@ -298,6 +298,19 @@ export async function fetchAddressById (addressId: string, includePaybuttons = f return result } +export async function setSyncingBatch (addressStringArray: string[], syncing: boolean): Promise { + await prisma.address.updateMany({ + where: { + address: { + in: addressStringArray + } + }, + data: { + syncing + } + }) +} + export async function setSyncing (addressString: string, syncing: boolean): Promise { const result = await prisma.address.update({ where: { diff --git a/services/chronikService.ts b/services/chronikService.ts index 43dd4f90..ec56fc33 100644 --- a/services/chronikService.ts +++ b/services/chronikService.ts @@ -16,7 +16,7 @@ import { import { Address, Prisma } from '@prisma/client' import xecaddr from 'xecaddrjs' import { getAddressPrefix, satoshisToUnit } from 'utils/index' -import { fetchAddressBySubstring, fetchAddressesArray, fetchAllAddressesForNetworkId, getEarliestUnconfirmedTxTimestampForAddress, getLatestConfirmedTxTimestampForAddress, setSyncing, updateLastSynced } from './addressService' +import { fetchAddressesArray, fetchAllAddressesForNetworkId, getEarliestUnconfirmedTxTimestampForAddress, getLatestConfirmedTxTimestampForAddress, setSyncing, setSyncingBatch, updateLastSynced } from './addressService' import * as ws from 'ws' import { BroadcastTxData } from 'ws-service/types' import config from 'config' @@ -284,17 +284,14 @@ export class ChronikBlockchainClient { return (await this.chronik.script(type, hash160).history(page, pageSize)).txs } - public async * syncTransactionsForAddress (addressString: string, fully = false, runTriggers = false): AsyncGenerator { - const address = await fetchAddressBySubstring(addressString) - if (address.syncing) { return } - await setSyncing(addressString, true) + public async * syncTransactionsForAddress (address: Address, fully = false, runTriggers = false): AsyncGenerator { const pageSize = FETCH_N let page = 0 const earliestUnconfirmedTxTimestamp = await getEarliestUnconfirmedTxTimestampForAddress(address.id) const latestTimestamp = earliestUnconfirmedTxTimestamp ?? await getLatestConfirmedTxTimestampForAddress(address.id) ?? 0 while (true) { - let transactions = await this.getPaginatedTxs(addressString, page, pageSize) + let transactions = await this.getPaginatedTxs(address.address, page, pageSize) // filter out transactions that happened before a certain date set in constants/index, // this date is understood as the beginning and we don't look past it @@ -318,11 +315,11 @@ export class ChronikBlockchainClient { if (persistedTransactions.length > 0) { const simplifiedTransactions = getSimplifiedTransactions(persistedTransactions) - console.log(`${this.CHRONIK_MSG_PREFIX}: added ${simplifiedTransactions.length} txs to ${addressString}`) + console.log(`${this.CHRONIK_MSG_PREFIX}: added ${simplifiedTransactions.length} txs to ${address.address}`) const broadcastTxData: BroadcastTxData = {} as BroadcastTxData broadcastTxData.messageType = 'OldTx' - broadcastTxData.address = addressString + broadcastTxData.address = address.address broadcastTxData.txs = simplifiedTransactions this.wsEndpoint.emit(SOCKET_MESSAGES.TXS_BROADCAST, broadcastTxData) @@ -335,8 +332,8 @@ export class ChronikBlockchainClient { await new Promise(resolve => setTimeout(resolve, FETCH_DELAY)) } - await setSyncing(addressString, false) - await updateLastSynced(addressString) + await setSyncing(address.address, false) + await updateLastSynced(address.address) } private async getUtxos (address: string): Promise { @@ -408,12 +405,22 @@ export class ChronikBlockchainClient { return sortedInputAddresses } + public async waitForSyncing (txId: string, addressStringArray: string[]): Promise { + if (!this.initializing) return + console.log(`${this.CHRONIK_MSG_PREFIX}: Waiting unblocking addresses for ${txId}`) + while (true) { + const addresses = await fetchAddressesArray(addressStringArray) + if (addresses.every(a => !a.syncing)) { + console.log(`${this.CHRONIK_MSG_PREFIX}: Finished unblocking addresses for ${txId}`) + return + } + await new Promise(resolve => setTimeout(resolve, CHRONIK_INITIALIZATION_DELAY)) + } + } + private async processWsMessage (msg: WsMsgClient): Promise { // delete unconfirmed transaction from our database // if they were cancelled and not confirmed - while (this.initializing) { - await new Promise(resolve => setTimeout(resolve, CHRONIK_INITIALIZATION_DELAY)) - } if (msg.type === 'Tx') { if (msg.msgType === 'TX_REMOVED_FROM_MEMPOOL') { console.log(`${this.CHRONIK_MSG_PREFIX}: [${msg.msgType}] ${msg.txid}`) @@ -433,13 +440,14 @@ export class ChronikBlockchainClient { if (this.isAlreadyBeingProcessed(msg.txid, false)) { return } - while (this.mempoolTxsBeingProcessed > MAX_MEMPOOL_TXS_TO_PROCESS_AT_A_TIME) { + while (this.mempoolTxsBeingProcessed >= MAX_MEMPOOL_TXS_TO_PROCESS_AT_A_TIME) { await new Promise(resolve => setTimeout(resolve, MEMPOOL_PROCESS_DELAY)) } this.mempoolTxsBeingProcessed += 1 console.log(`${this.CHRONIK_MSG_PREFIX}: [${msg.msgType}] ${msg.txid}`) const transaction = await this.chronik.tx(msg.txid) const addressesWithTransactions = await this.getAddressesForTransaction(transaction) + await this.waitForSyncing(msg.txid, addressesWithTransactions.map(obj => obj.address.address)) const inputAddresses = this.getSortedInputAddresses(transaction) for (const addressWithTransaction of addressesWithTransactions) { const { created, tx } = await upsertTransaction(addressWithTransaction.transaction) @@ -456,6 +464,9 @@ export class ChronikBlockchainClient { console.log(`${this.CHRONIK_MSG_PREFIX}: [${msg.msgType}] Height: ${msg.blockHeight} Hash: ${msg.blockHash}`) if (msg.msgType === 'BLK_FINALIZED') { console.log(`${this.CHRONIK_MSG_PREFIX}: [${msg.msgType}] Syncing ${this.confirmedTxsHashesFromLastBlock.length} txs on the block...`) + while (this.initializing) { + await new Promise(resolve => setTimeout(resolve, CHRONIK_INITIALIZATION_DELAY)) + } await this.syncBlockTransactions(msg.blockHash) console.log(`${this.CHRONIK_MSG_PREFIX}: [${msg.msgType}] Syncing done.`) this.confirmedTxsHashesFromLastBlock = [] @@ -567,9 +578,10 @@ export class ChronikBlockchainClient { } } console.log(`${this.CHRONIK_MSG_PREFIX} Syncing ${addresses.length} addresses...`) + await setSyncingBatch(addresses.map(a => a.address), true) for (const addr of addresses) { try { - const generator = this.syncTransactionsForAddress(addr.address, false, runTriggers) + const generator = this.syncTransactionsForAddress(addr, false, runTriggers) let count = 0 while (true) { const result = await generator.next() @@ -593,7 +605,10 @@ export class ChronikBlockchainClient { } } const failedAddresses = Object.keys(failedAddressesWithErrors) - console.log(`${this.CHRONIK_MSG_PREFIX} Finished syncing ${addresses.length} addresses with ${failedAddresses.length} errors.`) + const totalSyncedTxsArray = Object.values(successfulAddressesWithCount) + const totalSyncedTxsCount = totalSyncedTxsArray.reduce((prev, curr) => prev + curr, 0) + + console.log(`${this.CHRONIK_MSG_PREFIX} Finished syncing ${totalSyncedTxsCount} txs for ${addresses.length} addresses with ${failedAddresses.length} errors.`) if (failedAddresses.length > 0) { console.log(`${this.CHRONIK_MSG_PREFIX} Failed addresses were:\n- ${Object.entries(failedAddressesWithErrors).map((kv: [string, string]) => `${kv[0]}: ${kv[1]}`).join('\n- ')}`) } @@ -704,13 +719,13 @@ class MultiBlockchainClient { void (async () => { if (this.isRunningApp()) { await syncPastDaysNewerPrices() - await connectAllTransactionsToPrices() const asyncOperations: Array> = [] this.clients = { ecash: this.instantiateChronikClient('ecash', asyncOperations), bitcoincash: this.instantiateChronikClient('bitcoincash', asyncOperations) } await Promise.all(asyncOperations) + this.initializing = false await connectAllTransactionsToPrices() this.clients.ecash.setInitialized() this.clients.bitcoincash.setInitialized() @@ -721,8 +736,8 @@ class MultiBlockchainClient { bitcoincash: this.instantiateChronikClient('bitcoincash', asyncOperations) } await Promise.all(asyncOperations) + this.initializing = false } - this.initializing = false })() } diff --git a/services/transactionService.ts b/services/transactionService.ts index 57a03c06..d341c679 100644 --- a/services/transactionService.ts +++ b/services/transactionService.ts @@ -461,13 +461,16 @@ export async function connectTransactionsListToPrices (txList: Transaction[]): P } export async function connectAllTransactionsToPrices (): Promise { + console.log('[PRICES] Started connecting txs to prices.') const noPricesTxs = await fetchAllTransactionsWithNoPrices() + console.log(`[PRICES] Found ${noPricesTxs.length} txs with no prices.`) const wrongNumberOfPricesTxs = await fetchAllTransactionsWithIrregularPrices() + console.log(`[PRICES] Found ${wrongNumberOfPricesTxs.length} txs with irregular prices.`) const txs = [ ...noPricesTxs, ...wrongNumberOfPricesTxs ] - console.log(`[PRICES] Connecting ${noPricesTxs.length} txs with no prices and ${wrongNumberOfPricesTxs.length} with irregular prices...`) + console.log('[PRICES] Connecting txs to prices...') void await connectTransactionsListToPrices(txs) console.log('[PRICES] Finished connecting txs to prices.') } diff --git a/tests/unittests/chronikService.test.ts b/tests/unittests/chronikService.test.ts index 614ab108..5979a0af 100644 --- a/tests/unittests/chronikService.test.ts +++ b/tests/unittests/chronikService.test.ts @@ -10,6 +10,7 @@ import { ChronikBlockchainClient, multiBlockchainClient } from '../../services/chronikService' +import { Address } from '@prisma/client' // Mock the heavy dependencies to avoid network calls in tests jest.mock('chronik-client-cashtokens', () => ({ @@ -49,6 +50,7 @@ jest.mock('../../services/addressService', () => ({ getEarliestUnconfirmedTxTimestampForAddress: jest.fn(), getLatestConfirmedTxTimestampForAddress: jest.fn(), setSyncing: jest.fn(), + setSyncingBatch: jest.fn(), updateLastSynced: jest.fn() })) @@ -922,7 +924,7 @@ describe('ChronikBlockchainClient advanced functionality', () => { // Mock the sync generator method const mockSyncGenerator = { - async *syncTransactionsForAddress(address: string) { + async *syncTransactionsForAddress(address: Address) { yield [] return }