Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ lint-master:
github-test-unit:
$(create_test_paybutton_json)
$(touch_local_env)
MASTER_SECRET_KEY="test" WS_AUTH_KEY="test" PRICE_API_TOKEN="foo" DATABASE_URL="mysql://paybutton-user-test:paybutton-password@db:3306/paybutton-test" npx ts-node -O '{"module":"commonjs"}' node_modules/jest/bin/jest.js tests/unittests --forceExit
MASTER_SECRET_KEY="test" WS_AUTH_KEY="test" PRICE_API_TOKEN="foo" npx ts-node -O '{"module":"commonjs"}' node_modules/jest/bin/jest.js tests/unittests --forceExit

# WARNING: this shouldn't be run on local machine, only on github. It will replace your config file
github-test-integration:
Expand Down
1 change: 1 addition & 0 deletions constants/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ export const NETWORK_TICKERS: KeyValueT<string> = {
}

export type NetworkTickersType = 'XEC' | 'BCH'
export type MainNetworkSlugsType = 'ecash' | 'bitcoincash'
export type NetworkSlugsType = 'ecash' | 'bitcoincash' | 'ectest' | 'bchtest'

export const NETWORK_TICKERS_FROM_ID: KeyValueT<string> = {
Expand Down
86 changes: 5 additions & 81 deletions jobs/initJobs.ts
Original file line number Diff line number Diff line change
@@ -1,89 +1,15 @@
import { XEC_NETWORK_ID, BCH_NETWORK_ID, CURRENT_PRICE_REPEAT_DELAY, SYNC_TXS_JOBS_RETRY_DELAY, SYNC_TXS_JOBS_MAX_RETRIES } from 'constants/index'
import { Queue, FlowProducer, FlowJob } from 'bullmq'
import { CURRENT_PRICE_REPEAT_DELAY } from 'constants/index'
import { Queue } from 'bullmq'
import { redisBullMQ } from 'redis/clientInstance'
import {
syncAllAddressTransactionsForNetworkWorker,
syncPricesWorker,
connectAllTransactionsToPricesWorker
} from './workers'
import EventEmitter from 'events'
import { syncCurrentPricesWorker } from './workers'

EventEmitter.defaultMaxListeners = 20

const RETRY_OPTS = {
attempts: SYNC_TXS_JOBS_MAX_RETRIES,
backoff: {
type: 'exponential',
delay: SYNC_TXS_JOBS_RETRY_DELAY
}
}

const main = async (): Promise<void> => {
const pricesQueue = new Queue('pricesSync', { connection: redisBullMQ })
const initTransactionsQueue = new Queue('initTransactionsSync', { connection: redisBullMQ })
const connectPricesQueue = new Queue('connectPrices', { connection: redisBullMQ })

const flowJobPrices: FlowJob = {
queueName: pricesQueue.name,
data: { syncType: 'past' },
name: 'syncPastPricesFlow',
opts: {
removeOnFail: false,
jobId: 'syncPastPrices',
...RETRY_OPTS
}
}
const flowJobSyncAndSubscribeXECAddresses: FlowJob = {
queueName: initTransactionsQueue.name,
data: { networkId: XEC_NETWORK_ID },
name: 'syncAndSubscribeXECAddressesFlow',
opts: {
removeOnFail: false,
jobId: 'syncAndSubscribeXECAddresses',
...RETRY_OPTS
}
}
const flowJobSyncAndSubscribeBCHAddresses: FlowJob = {
queueName: initTransactionsQueue.name,
data: { networkId: BCH_NETWORK_ID },
name: 'syncAndSubscribeBCHAddressesFlow',
opts: {
removeOnFail: false,
jobId: 'syncAndSubscribeBCHAddresses',
...RETRY_OPTS
}
}

const flowJobConnectAllTransactions: FlowJob = {
queueName: connectPricesQueue.name,
data: {},
name: 'connectAllTransactionsToPricesFlow',
opts: {
removeOnComplete: false,
removeOnFail: { count: 3 },
jobId: 'connectAllTransactionsToPrices',
...RETRY_OPTS
}
}

const flowProducer = new FlowProducer({ connection: redisBullMQ })
await flowProducer.add({
...flowJobConnectAllTransactions,
children: [
{
...flowJobSyncAndSubscribeBCHAddresses,
children: [
{
...flowJobSyncAndSubscribeXECAddresses,
children: [flowJobPrices]
}
]
}
]
})

await pricesQueue.add('syncCurrentPrices',
{ syncType: 'current' },
{},
{
jobId: 'syncCurrentPrices',
removeOnFail: false,
Expand All @@ -93,9 +19,7 @@ const main = async (): Promise<void> => {
}
)

await syncPricesWorker(pricesQueue.name)
await syncAllAddressTransactionsForNetworkWorker(initTransactionsQueue.name)
await connectAllTransactionsToPricesWorker(connectPricesQueue.name)
await syncCurrentPricesWorker(pricesQueue.name)
}

void main()
91 changes: 4 additions & 87 deletions jobs/workers.ts
Original file line number Diff line number Diff line change
@@ -1,69 +1,17 @@
import { Worker, Job } from 'bullmq'
import { Worker } from 'bullmq'
import { redisBullMQ } from 'redis/clientInstance'
import { DEFAULT_WORKER_LOCK_DURATION, RESPONSE_MESSAGES, KeyValueT } from 'constants/index'
import { DEFAULT_WORKER_LOCK_DURATION } from 'constants/index'

import * as transactionService from 'services/transactionService'
import * as priceService from 'services/priceService'
import * as addressService from 'services/addressService'
import { parseError } from 'utils/validators'

const syncAllAddressTransactionsForNetworkJob = async (job: Job): Promise<void> => {
console.log(`job ${job.id as string}: syncing all addresses for network ${job.data.networkId as string}...`)
let failedAddressesWithErrors: KeyValueT<string> = {}
try {
let addresses = await addressService.fetchAllAddressesForNetworkId(job.data.networkId)
console.log(`found ${addresses.length} addresses...`)
addresses = job.data.fully === true ? addresses : addresses.filter(addr => addr.lastSynced == null)
failedAddressesWithErrors = (await transactionService.syncAddresses(addresses, job.data.fully === true)).failedAddressesWithErrors
} catch (err: any) {
const parsedError = parseError(err)
if (parsedError.message === RESPONSE_MESSAGES.TRANSACTION_ALREADY_EXISTS_FOR_ADDRESS_400.message) {
console.log(`initial syncing of network ${job.data.networkId as string} encountered known transaction, skipping...`)
} else {
if (Object.keys(failedAddressesWithErrors).length > 0) {
console.error(`ERROR: (skipping anyway) initial syncing of network ${job.data.networkId as string} FAILED for addresses ${JSON.stringify(failedAddressesWithErrors)}: ${err.message as string}`)
} else {
console.error(`ERROR: (skipping anyway) initial syncing of network ${job.data.networkId as string} FAILED: ${err.message as string}`)
}
}
}
}

export const syncAllAddressTransactionsForNetworkWorker = async (queueName: string): Promise<void> => {
const worker = new Worker(
queueName,
syncAllAddressTransactionsForNetworkJob,
{
connection: redisBullMQ,
lockDuration: DEFAULT_WORKER_LOCK_DURATION
}
)
worker.on('completed', job => {
console.log(`initial syncing finished for network ${job.data.networkId as string}`)
})

worker.on('failed', (job, err) => {
if (job !== undefined) {
console.log(`initial syncing FAILED for network ${job.data.networkId as string}`)
console.log(`error for initial syncing of network ${job.data.networkId as string}: ${err.message}`)
}
})
}

export const syncPricesWorker = async (queueName: string): Promise<void> => {
export const syncCurrentPricesWorker = async (queueName: string): Promise<void> => {
const worker = new Worker(
queueName,
async (job) => {
const syncType = job.data.syncType
console.log(`job ${job.id as string}: syncing ${syncType as string} prices...`)

if (syncType === 'past') {
await priceService.syncPastDaysNewerPrices()
} else if (syncType === 'current') {
await priceService.syncCurrentPrices()
} else {
console.log(`Unknown type of price sync: ${job.data.syncType as string}`)
}
await priceService.syncCurrentPrices()
},
{
connection: redisBullMQ,
Expand All @@ -81,34 +29,3 @@ export const syncPricesWorker = async (queueName: string): Promise<void> => {
}
})
}

export const connectAllTransactionsToPricesWorker = async (queueName: string): Promise<void> => {
const worker = new Worker(
queueName,
async (job) => {
console.log(`job ${job.id as string}: connecting prices to transactions...`)
const noPricesTxs = await transactionService.fetchAllTransactionsWithNoPrices()
const wrongNumberOfPricesTxs = await transactionService.fetchAllTransactionsWithIrregularPrices()
const txs = [
...noPricesTxs,
...wrongNumberOfPricesTxs
]
console.log(`found ${noPricesTxs.length} with no prices and ${wrongNumberOfPricesTxs.length} txs with wrong number of prices`)
void await transactionService.connectTransactionsListToPrices(txs)
},
{
connection: redisBullMQ,
lockDuration: DEFAULT_WORKER_LOCK_DURATION
}
)
worker.on('completed', job => {
console.log('connection of prices to txs finished')
})

worker.on('failed', (job, err) => {
if (job !== undefined) {
console.log('automatic connecting of txs to prices FAILED')
console.log(`error for connecting txs to prices: ${err.message}: ${err.stack ?? 'no stack'}`)
}
})
}
4 changes: 2 additions & 2 deletions pages/api/address/balance/[address].ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import { NextApiResponse, NextApiRequest } from 'next'
import { getBalance } from 'services/blockchainService'
import { RESPONSE_MESSAGES } from 'constants/index'
import { parseAddress } from 'utils/validators'
import Cors from 'cors'
import { runMiddleware, satoshisToUnit } from 'utils/index'
import xecaddr from 'xecaddrjs'
import { Prisma } from '@prisma/client'
import { multiBlockchainClient } from 'services/chronikService'

const { ADDRESS_NOT_PROVIDED_400 } = RESPONSE_MESSAGES
const cors = Cors({
Expand All @@ -17,7 +17,7 @@ export default async (req: NextApiRequest, res: NextApiResponse): Promise<void>
if (req.method === 'GET') {
try {
const address = parseAddress(req.query.address as string)
const response = await getBalance(address)
const response = await multiBlockchainClient.getBalance(address)
const balance = await satoshisToUnit(new Prisma.Decimal(response), xecaddr.detectAddressFormat(address))
res.status(200).send(balance)
} catch (err: any) {
Expand Down
5 changes: 3 additions & 2 deletions pages/api/address/transactions/[address].ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import { NextApiResponse, NextApiRequest } from 'next'
import { parseAddress } from 'utils/validators'
import { DEFAULT_TX_PAGE_SIZE, RESPONSE_MESSAGES, TX_PAGE_SIZE_LIMIT } from 'constants/index'
import { fetchPaginatedAddressTransactions, syncAndSubscribeAddresses } from 'services/transactionService'
import { fetchPaginatedAddressTransactions } from 'services/transactionService'
import { upsertAddress } from 'services/addressService'
import Cors from 'cors'
import { runMiddleware } from 'utils/index'
import { multiBlockchainClient } from 'services/chronikService'

const { ADDRESS_NOT_PROVIDED_400, INVALID_ADDRESS_400, NO_ADDRESS_FOUND_404, STARTED_SYNC_200 } = RESPONSE_MESSAGES
const cors = Cors({
Expand Down Expand Up @@ -43,7 +44,7 @@ export default async (req: NextApiRequest, res: NextApiResponse): Promise<void>
case NO_ADDRESS_FOUND_404.message: {
if (serverOnly) throw new Error(NO_ADDRESS_FOUND_404.message)
const addressObject = await upsertAddress(address)
await syncAndSubscribeAddresses([addressObject])
await multiBlockchainClient.syncAndSubscribeAddresses([addressObject])
res.status(STARTED_SYNC_200.statusCode).json(STARTED_SYNC_200)
break
}
Expand Down
4 changes: 2 additions & 2 deletions pages/api/transaction/[transactionId].ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { NextApiResponse, NextApiRequest } from 'next/types'
import { getTransactionDetails } from 'services/blockchainService'
import { multiBlockchainClient } from 'services/chronikService'
import { RESPONSE_MESSAGES } from 'constants/index'

export default async (req: NextApiRequest, res: NextApiResponse): Promise<void> => {
Expand All @@ -10,7 +10,7 @@ export default async (req: NextApiRequest, res: NextApiResponse): Promise<void>
if (transactionId === '' || transactionId === undefined) {
throw new Error(RESPONSE_MESSAGES.TRANSACTION_ID_NOT_PROVIDED_400.message)
}
const response = await getTransactionDetails(transactionId, networkSlug)
const response = await multiBlockchainClient.getTransactionDetails(transactionId, networkSlug)
res.status(200).json(response)
} catch (err: any) {
switch (err.message) {
Expand Down
Loading