diff --git a/constants/index.ts b/constants/index.ts index b8bb6b91..72ae4c3a 100644 --- a/constants/index.ts +++ b/constants/index.ts @@ -291,3 +291,5 @@ export const TRIGGER_LOG_BATCH_SIZE = 200 export const PRICES_CONNECTION_BATCH_SIZE = 1_000 // interactive $transaction timeout in ms (for the single delete + several createMany of prices) export const PRICES_CONNECTION_TIMEOUT = 30_000 + +export const CLIENT_PAYMENT_EXPIRATION_TIME = (7) * (24 * 60 * 60 * 1000) // (number of days) * (24 * 60 * 60 * 1000) diff --git a/jobs/initJobs.ts b/jobs/initJobs.ts index a763cfe0..07291159 100644 --- a/jobs/initJobs.ts +++ b/jobs/initJobs.ts @@ -1,8 +1,8 @@ -import { CURRENT_PRICE_REPEAT_DELAY } from 'constants/index' +import { CLIENT_PAYMENT_EXPIRATION_TIME, CURRENT_PRICE_REPEAT_DELAY } from 'constants/index' import { Queue } from 'bullmq' import { redisBullMQ } from 'redis/clientInstance' import EventEmitter from 'events' -import { syncCurrentPricesWorker, syncBlockchainAndPricesWorker } from './workers' +import { syncCurrentPricesWorker, syncBlockchainAndPricesWorker, cleanupClientPaymentsWorker } from './workers' EventEmitter.defaultMaxListeners = 20 @@ -24,6 +24,22 @@ const main = async (): Promise => { const blockchainQueue = new Queue('blockchainSync', { connection: redisBullMQ }) await blockchainQueue.add('syncBlockchainAndPrices', {}, { jobId: 'syncBlockchainAndPrices' }) await syncBlockchainAndPricesWorker(blockchainQueue.name) + + const cleanupQueue = new Queue('clientPaymentCleanup', { connection: redisBullMQ }) + + await cleanupQueue.add( + 'cleanupClientPayments', + {}, + { + jobId: 'cleanupClientPayments', + removeOnFail: false, + repeat: { + every: CLIENT_PAYMENT_EXPIRATION_TIME + } + } + ) + + await cleanupClientPaymentsWorker(cleanupQueue.name) } void main() diff --git a/jobs/workers.ts b/jobs/workers.ts index 833ebca1..0f0efefe 100644 --- a/jobs/workers.ts +++ b/jobs/workers.ts @@ -3,6 +3,7 @@ import { redisBullMQ } from 'redis/clientInstance' import { DEFAULT_WORKER_LOCK_DURATION } from 'constants/index' import { multiBlockchainClient } from 'services/chronikService' import { connectAllTransactionsToPrices } from 'services/transactionService' +import { cleanupExpiredClientPayments } from 'services/clientPaymentService' import * as priceService from 'services/priceService' @@ -60,3 +61,26 @@ export const syncBlockchainAndPricesWorker = async (queueName: string): Promise< } }) } + +export const cleanupClientPaymentsWorker = async (queueName: string): Promise => { + const worker = new Worker( + queueName, + async (job) => { + console.log(`[CLIENT_PAYMENT CLEANUP] job ${job.id as string}: running expired payment cleanup...`) + await cleanupExpiredClientPayments() + console.log('[CLIENT_PAYMENT CLEANUP] cleanup finished.') + }, + { + connection: redisBullMQ, + lockDuration: DEFAULT_WORKER_LOCK_DURATION + } + ) + + worker.on('completed', job => { + console.log(`[CLIENT_PAYMENT CLEANUP] job ${job.id as string}: completed successfully`) + }) + + worker.on('failed', (job, err) => { + console.error(`[CLIENT_PAYMENT CLEANUP] job ${job?.id as string}: FAILED — ${err.message}`) + }) +} diff --git a/pages/api/payments/paymentId/index.ts b/pages/api/payments/paymentId/index.ts index 599178d2..9e070c73 100644 --- a/pages/api/payments/paymentId/index.ts +++ b/pages/api/payments/paymentId/index.ts @@ -1,5 +1,5 @@ import { Decimal } from '@prisma/client/runtime/library' -import { generatePaymentId } from 'services/transactionService' +import { generatePaymentId } from 'services/clientPaymentService' import { parseAddress, parseCreatePaymentIdPOSTRequest } from 'utils/validators' import { RESPONSE_MESSAGES } from 'constants/index' import { runMiddleware } from 'utils/index' diff --git a/services/chronikService.ts b/services/chronikService.ts index 1ef78561..64330f95 100644 --- a/services/chronikService.ts +++ b/services/chronikService.ts @@ -10,10 +10,12 @@ import { fetchUnconfirmedTransactions, upsertTransaction, getSimplifiedTransactions, - getSimplifiedTrasaction, + getSimplifiedTrasaction +} from './transactionService' +import { updateClientPaymentStatus, getClientPayment -} from './transactionService' +} from './clientPaymentService' import { Address, Prisma, ClientPaymentStatus } from '@prisma/client' import xecaddr from 'xecaddrjs' import { getAddressPrefix, satoshisToUnit } from 'utils/index' diff --git a/services/clientPaymentService.ts b/services/clientPaymentService.ts new file mode 100644 index 00000000..4668ad04 --- /dev/null +++ b/services/clientPaymentService.ts @@ -0,0 +1,82 @@ +import prisma from 'prisma-local/clientInstance' +import { v4 as uuidv4 } from 'uuid' +import { multiBlockchainClient } from 'services/chronikService' +import { Prisma, ClientPaymentStatus } from '@prisma/client' +import { NETWORK_IDS_FROM_SLUGS, CLIENT_PAYMENT_EXPIRATION_TIME } from 'constants/index' +import { parseAddress } from 'utils/validators' +import { addressExists } from './addressService' +import moment from 'moment' + +export const generatePaymentId = async (address: string, amount?: Prisma.Decimal): Promise => { + const rawUUID = uuidv4() + const cleanUUID = rawUUID.replace(/-/g, '') + const status = 'PENDING' as ClientPaymentStatus + address = parseAddress(address) + const prefix = address.split(':')[0].toLowerCase() + const networkId = NETWORK_IDS_FROM_SLUGS[prefix] + const isAddressRegistered = await addressExists(address) + + const clientPayment = await prisma.clientPayment.create({ + data: { + address: { + connectOrCreate: { + where: { address }, + create: { + address, + networkId + } + } + }, + paymentId: cleanUUID, + status, + amount + }, + include: { + address: true + } + }) + + if (!isAddressRegistered) { + void multiBlockchainClient.syncAndSubscribeAddresses([clientPayment.address]) + } + + return clientPayment.paymentId +} + +export const updateClientPaymentStatus = async (paymentId: string, status: ClientPaymentStatus): Promise => { + await prisma.clientPayment.update({ + where: { paymentId }, + data: { status } + }) +} + +export const getClientPayment = async (paymentId: string): Promise | null> => { + return await prisma.clientPayment.findUnique({ + where: { paymentId }, + include: { address: true } + }) +} + +export const cleanupExpiredClientPayments = async (): Promise => { + const cutoff = moment.utc().subtract(CLIENT_PAYMENT_EXPIRATION_TIME, 'milliseconds').toDate() + + const oldPaymentsUnpaid = await prisma.clientPayment.findMany({ + where: { + status: 'PENDING', + createdAt: { lt: cutoff } + }, + select: { paymentId: true } + }) + + if (oldPaymentsUnpaid.length === 0) { + console.log('[CLIENT_PAYMENT CLEANUP] no expired pending payments found.') + return + } + + console.log(`[CLIENT_PAYMENT CLEANUP] deleting ${oldPaymentsUnpaid.length} expired pending payments...`) + await prisma.clientPayment.deleteMany({ + where: { + paymentId: { in: oldPaymentsUnpaid.map(p => p.paymentId) } + } + }) +} diff --git a/services/transactionService.ts b/services/transactionService.ts index 49e1b3ef..044287eb 100644 --- a/services/transactionService.ts +++ b/services/transactionService.ts @@ -1,6 +1,6 @@ import prisma from 'prisma-local/clientInstance' -import { Prisma, Transaction, ClientPaymentStatus } from '@prisma/client' -import { RESPONSE_MESSAGES, USD_QUOTE_ID, CAD_QUOTE_ID, N_OF_QUOTES, UPSERT_TRANSACTION_PRICES_ON_DB_TIMEOUT, SupportedQuotesType, NETWORK_IDS, NETWORK_IDS_FROM_SLUGS, PRICES_CONNECTION_BATCH_SIZE, PRICES_CONNECTION_TIMEOUT } from 'constants/index' +import { Prisma, Transaction } from '@prisma/client' +import { RESPONSE_MESSAGES, USD_QUOTE_ID, CAD_QUOTE_ID, N_OF_QUOTES, UPSERT_TRANSACTION_PRICES_ON_DB_TIMEOUT, SupportedQuotesType, NETWORK_IDS, PRICES_CONNECTION_BATCH_SIZE, PRICES_CONNECTION_TIMEOUT } from 'constants/index' import { fetchAddressBySubstring, fetchAddressById, fetchAddressesByPaybuttonId, addressExists } from 'services/addressService' import { AllPrices, QuoteValues, fetchPricesForNetworkAndTimestamp, flattenTimestamp } from 'services/priceService' import _ from 'lodash' @@ -9,8 +9,6 @@ import { SimplifiedTransaction } from 'ws-service/types' import { OpReturnData, parseAddress } from 'utils/validators' import { generatePaymentFromTxWithInvoices } from 'redis/paymentCache' import { ButtonDisplayData, Payment } from 'redis/types' -import { v4 as uuidv4 } from 'uuid' -import { multiBlockchainClient } from 'services/chronikService' export function getTransactionValue (transaction: TransactionWithPrices | TransactionsWithPaybuttonsAndPrices | SimplifiedTransaction): QuoteValues { const ret: QuoteValues = { @@ -976,53 +974,3 @@ export const fetchDistinctPaymentYearsByUser = async (userId: string): Promise y.year) } - -export const generatePaymentId = async (address: string, amount?: Prisma.Decimal): Promise => { - const rawUUID = uuidv4() - const cleanUUID = rawUUID.replace(/-/g, '') - const status = 'PENDING' as ClientPaymentStatus - address = parseAddress(address) - const prefix = address.split(':')[0].toLowerCase() - const networkId = NETWORK_IDS_FROM_SLUGS[prefix] - const isAddressRegistered = await addressExists(address) - - const clientPayment = await prisma.clientPayment.create({ - data: { - address: { - connectOrCreate: { - where: { address }, - create: { - address, - networkId - } - } - }, - paymentId: cleanUUID, - status, - amount - }, - include: { - address: true - } - }) - - if (!isAddressRegistered) { - void multiBlockchainClient.syncAndSubscribeAddresses([clientPayment.address]) - } - - return clientPayment.paymentId -} - -export const updateClientPaymentStatus = async (paymentId: string, status: ClientPaymentStatus): Promise => { - await prisma.clientPayment.update({ - where: { paymentId }, - data: { status } - }) -} - -export const getClientPayment = async (paymentId: string): Promise | null> => { - return await prisma.clientPayment.findUnique({ - where: { paymentId }, - include: { address: true } - }) -} diff --git a/tests/unittests/handleUpdateClientPaymentStatus.test.ts b/tests/unittests/handleUpdateClientPaymentStatus.test.ts index c00d9cde..5366876d 100644 --- a/tests/unittests/handleUpdateClientPaymentStatus.test.ts +++ b/tests/unittests/handleUpdateClientPaymentStatus.test.ts @@ -4,8 +4,8 @@ import { ChronikBlockchainClient } from '../../services/chronikService' process.env.WS_AUTH_KEY = 'test-auth-key' -// Mock the transactionService functions -jest.mock('../../services/transactionService', () => ({ +// Mock the clientPayment functions +jest.mock('../../services/clientPaymentService', () => ({ getClientPayment: jest.fn(), updateClientPaymentStatus: jest.fn() })) @@ -71,9 +71,9 @@ describe('handleUpdateClientPaymentStatus tests', () => { // Get the mocked functions // eslint-disable-next-line @typescript-eslint/no-var-requires - const transactionService = require('../../services/transactionService') - mockGetClientPayment = transactionService.getClientPayment - mockUpdateClientPaymentStatus = transactionService.updateClientPaymentStatus + const clientPaymentService = require('../../services/clientPaymentService') + mockGetClientPayment = clientPaymentService.getClientPayment + mockUpdateClientPaymentStatus = clientPaymentService.updateClientPaymentStatus // Clear all mocks before each test jest.clearAllMocks()