Skip to content

Commit 6332239

Browse files
committed
refactor: cleanup comments
1 parent 85d535e commit 6332239

File tree

3 files changed

+78
-101
lines changed

3 files changed

+78
-101
lines changed

services/chronikService.ts

Lines changed: 43 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -288,24 +288,21 @@ export class ChronikBlockchainClient {
288288
return (await this.chronik.script(type, hash160).history(page, pageSize)).txs
289289
}
290290

291-
// For each address, fetch PAGE_CONCURRENCY pages in parallel (“burst”),
292-
// then use the burst’s newest/oldest timestamps to decide whether to continue.
293-
// Yields happen only in the generator body (after each slice finishes, and at final flush).
291+
/*
292+
* For each address, fetch PAGE_CONCURRENCY pages in parallel (“burst”),
293+
* then use the burst’s newest/oldest timestamps to decide whether to continue.
294+
* Yields happen only in the generator body (after each slice finishes, and at final flush).
295+
*/
294296
private async * fetchLatestTxsForAddresses (
295297
addresses: Address[]
296298
): AsyncGenerator<Prisma.TransactionUncheckedCreateInput[]> {
297-
// 1024 -> 7:10m
298-
// 512 -> 7:34.175
299-
// 256 -> 7:47.876
300-
const pagesPerBurstPerAddress = 1 // pageConcurrency
301299
const logPrefix = `${this.CHRONIK_MSG_PREFIX}[PARALLEL FETCHING]`
302300

303301
console.log(
304302
`${logPrefix}: Will fetch latest txs for ${addresses.length} addresses ` +
305-
`(addressConcurrency=${INITIAL_ADDRESS_SYNC_FETCH_CONCURRENTLY}, pageConcurrency=${pagesPerBurstPerAddress}).`
303+
`(addressConcurrency=${INITIAL_ADDRESS_SYNC_FETCH_CONCURRENTLY}, pageConcurrency=1).`
306304
)
307305

308-
// Shared accumulation buffer — emitted only at the top-level generator yields
309306
let transactionsToEmitBuffer: Prisma.TransactionUncheckedCreateInput[] = []
310307

311308
for (let i = 0; i < addresses.length; i += INITIAL_ADDRESS_SYNC_FETCH_CONCURRENTLY) {
@@ -321,82 +318,59 @@ export class ChronikBlockchainClient {
321318
let hasReachedStoppingCondition = false
322319

323320
while (!hasReachedStoppingCondition) {
324-
const pageIndicesInBurst = Array.from(
325-
{ length: pagesPerBurstPerAddress },
326-
(_, k) => nextBurstBasePageIndex + k
327-
)
328-
329-
// Fetch one "burst" of pages for this address in parallel.
330-
// Swallow individual page errors so one failing page doesn't cancel the address worker.
331-
const burstFetchResults = await Promise.all(
332-
pageIndicesInBurst.map(async (pageIndex) => {
333-
try {
334-
const value = await this.getPaginatedTxs(address.address, pageIndex, CHRONIK_FETCH_N_TXS_PER_PAGE)
335-
return { page: pageIndex, value }
336-
} catch (err: any) {
337-
console.warn(`${addrLogPrefix} page=${pageIndex} failed: ${err?.message as string ?? err as string}`)
338-
return { page: pageIndex, value: [] as any[] }
339-
}
340-
})
341-
)
321+
const pageIndex = nextBurstBasePageIndex
322+
323+
// Fetch the single page for this burst; swallow page errors.
324+
let pageTxs: any[] = []
325+
try {
326+
pageTxs = await this.getPaginatedTxs(address.address, pageIndex, CHRONIK_FETCH_N_TXS_PER_PAGE)
327+
} catch (err: any) {
328+
console.warn(`${addrLogPrefix} page=${pageIndex} failed: ${err?.message as string ?? err as string}`)
329+
pageTxs = []
330+
}
342331

343-
// Only consider non-empty pages for timestamp bounds and processing.
344-
const nonEmptyPageResults = burstFetchResults.filter(r => r.value.length > 0)
345-
if (nonEmptyPageResults.length === 0) {
332+
// If the page is empty, treat as "EMPTY ADDRESS" and stop.
333+
if (pageTxs.length === 0) {
346334
console.log(`${addrLogPrefix} EMPTY ADDRESS`)
347-
break // nothing in this burst -> done with this address
335+
break
348336
}
349337

350-
// Determine burst-level timestamp bounds
351-
const newestTimestampAcrossBurst = Math.max(
352-
...nonEmptyPageResults.map(r => Number(r.value[0].block?.timestamp ?? r.value[0].timeFirstSeen))
353-
)
354-
const oldestTimestampAcrossBurst = Math.min(
355-
...nonEmptyPageResults.map(r => {
356-
const pageTxs = r.value
357-
const lastTx = pageTxs[pageTxs.length - 1]
358-
return Number(lastTx.block?.timestamp ?? lastTx.timeFirstSeen)
359-
})
360-
)
338+
// Burst-level bounds collapse to this single page
339+
const newestTimestampAcrossBurst = Number(pageTxs[0].block?.timestamp ?? pageTxs[0].timeFirstSeen)
340+
const lastTxInPage = pageTxs[pageTxs.length - 1]
341+
const oldestTimestampAcrossBurst = Number(lastTxInPage.block?.timestamp ?? lastTxInPage.timeFirstSeen)
361342

362-
// If the newest tx across the entire burst is older than our last sync, we can quit immediately.
343+
// If even the newest is older than lastSync, stop.
363344
if (newestTimestampAcrossBurst < lastSyncedTimestampSeconds) {
364345
console.log(`${addrLogPrefix} NO NEW TXS`)
365346
break
366347
}
367348

368-
// Process each non-empty page in ascending page order.
349+
// Process this page
369350
let keptTransactionsInBurstCount = 0
370-
for (const pageResult of nonEmptyPageResults.sort((a, b) => a.page - b.page)) {
371-
const pageTxs = pageResult.value
372-
373-
const filteredTxs = pageTxs
374-
.filter(txThresholdFilter)
375-
.filter(t => t.block === undefined || t.block.timestamp >= lastSyncedTimestampSeconds)
376-
377-
if (filteredTxs.length > 0) {
378-
const txRowsToCreate = await Promise.all(
379-
filteredTxs.map(async t => await this.getTransactionFromChronikTransaction(t, address))
380-
)
381-
transactionsToEmitBuffer.push(...txRowsToCreate)
382-
keptTransactionsInBurstCount += txRowsToCreate.length
383-
}
384351

385-
// If this page’s oldest tx is already older than our last sync, we can stop after the burst.
386-
const oldestTimestampInPage = Number(
387-
pageTxs[pageTxs.length - 1].block?.timestamp ?? pageTxs[pageTxs.length - 1].timeFirstSeen
352+
const filteredTxs = pageTxs
353+
.filter(txThresholdFilter)
354+
.filter(t => t.block === undefined || t.block.timestamp >= lastSyncedTimestampSeconds)
355+
356+
if (filteredTxs.length > 0) {
357+
const txRowsToCreate = await Promise.all(
358+
filteredTxs.map(async t => await this.getTransactionFromChronikTransaction(t, address))
388359
)
389-
if (oldestTimestampInPage < lastSyncedTimestampSeconds) {
390-
hasReachedStoppingCondition = true
391-
// continue the for-loop to finish logging consistently
392-
}
360+
transactionsToEmitBuffer.push(...txRowsToCreate)
361+
keptTransactionsInBurstCount += txRowsToCreate.length
362+
}
363+
364+
// If the page’s oldest tx is older than lastSync, stop after this burst.
365+
const oldestTimestampInPage = oldestTimestampAcrossBurst
366+
if (oldestTimestampInPage < lastSyncedTimestampSeconds) {
367+
hasReachedStoppingCondition = true
393368
}
394369

395370
console.log(`${addrLogPrefix} ${keptTransactionsInBurstCount} new txs...`)
396371

397-
nextBurstBasePageIndex += pagesPerBurstPerAddress
372+
nextBurstBasePageIndex += 1
398373

399-
// Fast-stop: if we kept nothing in this burst and the burst’s oldest is older than lastSync, we’re done.
400374
if (keptTransactionsInBurstCount === 0 && oldestTimestampAcrossBurst < lastSyncedTimestampSeconds) {
401375
hasReachedStoppingCondition = true
402376
}
@@ -719,19 +693,16 @@ export class ChronikBlockchainClient {
719693
console.time(`${this.CHRONIK_MSG_PREFIX} syncAddresses`)
720694
await setSyncingBatch(addresses.map(a => a.address), true)
721695

722-
// per-address counters
723696
const perAddrCount = new Map<string, number>()
724697
addresses.forEach(a => perAddrCount.set(a.id, 0))
725698

726-
// commit buffer
727699
let toCommit: Prisma.TransactionUncheckedCreateInput[] = []
728700

729701
try {
730702
const pfx = `${this.CHRONIK_MSG_PREFIX}[PARALLEL FETCHING]`
731-
// consume generator: it yields batches of prepared txs
732-
console.log(`${pfx} will fetch batches of ${INITIAL_ADDRESS_SYNC_FETCH_CONCURRENTLY} txs from chronik`)
703+
console.log(`${pfx} will fetch batches of ${INITIAL_ADDRESS_SYNC_FETCH_CONCURRENTLY} addresses from chronik`)
733704
for await (const batch of this.fetchLatestTxsForAddresses(addresses)) {
734-
console.log(`${pfx} fetched batch of ${batch.length} tx from chronik`)
705+
console.log(`${pfx} fetched batch of ${batch.length} txs from chronik`)
735706
// count per address before committing
736707
for (const tx of batch) {
737708
perAddrCount.set(tx.addressId, (perAddrCount.get(tx.addressId) ?? 0) + 1)
@@ -753,13 +724,11 @@ export class ChronikBlockchainClient {
753724

754725
// broadcast/triggers after commit
755726
if (created.length > 0) {
756-
// Always broadcast per tx (keeps existing behavior)
757727
const triggerBatch: BroadcastTxData[] = []
758728
for (const tx of created) {
759729
const bd = this.broadcastIncomingTx(tx.address.address, tx, []) // inputAddresses left empty in bulk
760730
triggerBatch.push(bd)
761731
}
762-
// Then, if enabled, execute triggers **in batch**
763732
if (runTriggers) {
764733
console.log(`${this.CHRONIK_MSG_PREFIX} executing trigger batch — broadcasts=${triggerBatch.length}`)
765734
await executeTriggersBatch(triggerBatch, this.networkId)

services/transactionService.ts

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -404,7 +404,7 @@ async function createPriceTxConnectionInChunks (
404404
const slice = rows.slice(i, i + PRICES_CONNECTION_BATCH_SIZE)
405405
await client.pricesOnTransactions.createMany({
406406
data: slice,
407-
skipDuplicates: true // respects unique(priceId, transactionId); safe like upsert
407+
skipDuplicates: true
408408
})
409409
}
410410
console.log('[PRICES] Inserted all price links.')
@@ -428,7 +428,7 @@ export async function connectTransactionsListToPrices (txList: Transaction[]): P
428428

429429
console.log(`[PRICES] Preparing to connect ${txList.length} txs to prices...`)
430430

431-
// Collect unique (networkId -> Set<flattenedTimestamp>)
431+
// collect UNIQUE (networkId, timestamp) pairs
432432
const networkIdToTimestamps = new Map<number, Set<number>>()
433433
await Promise.all(txList.map(async (t) => {
434434
const networkId = await getTransactionNetworkId(t)
@@ -438,13 +438,13 @@ export async function connectTransactionsListToPrices (txList: Transaction[]): P
438438
networkIdToTimestamps.set(networkId, set)
439439
}))
440440

441-
// Fetch AllPrices for each unique (networkId, ts)
441+
// fetch AllPrices for each unique (networkId, timestamp)
442442
const timestampToPrice: Record<number, AllPrices> = {}
443443
let pairs = 0
444-
for (const [networkId, stamps] of networkIdToTimestamps.entries()) {
445-
for (const ts of stamps) {
444+
for (const [networkId, timestamps] of networkIdToTimestamps.entries()) {
445+
for (const ts of timestamps) {
446446
pairs++
447-
// outside any ITX; fine to run in parallel-ish
447+
// not parallel
448448
const allPrices = await fetchPricesForNetworkAndTimestamp(networkId, ts, prisma)
449449
timestampToPrice[ts] = allPrices
450450
}
@@ -460,7 +460,6 @@ export async function connectTransactionsListToPrices (txList: Transaction[]): P
460460
}
461461
console.log(`[PRICES] Built ${rows.length} price links (2 per tx).`)
462462

463-
// One deleteMany + chunked createMany inside a single interactive tx
464463
await prisma.$transaction(async (tx) => {
465464
console.log(`[PRICES] Disconnecting existing price links for ${txList.length} txs...`)
466465
await tx.pricesOnTransactions.deleteMany({

services/triggerService.ts

Lines changed: 29 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -274,16 +274,16 @@ async function fetchTriggersGroupedByAddress (addresses: string[]): Promise<Map<
274274
include: triggerWithPaybuttonAndUserInclude
275275
})
276276

277-
const byAddr = new Map<string, TriggerWithPaybuttonAndUser[]>()
277+
const triggersByAddress = new Map<string, TriggerWithPaybuttonAndUser[]>()
278278
for (const t of triggers as TriggerWithPaybuttonAndUser[]) {
279279
for (const conn of t.paybutton.addresses) {
280280
const addr = conn.address.address
281-
const arr = byAddr.get(addr) ?? []
281+
const arr = triggersByAddress.get(addr) ?? []
282282
arr.push(t)
283-
byAddr.set(addr, arr)
283+
triggersByAddress.set(addr, arr)
284284
}
285285
}
286-
return byAddr
286+
return triggersByAddress
287287
}
288288

289289
function buildPostParams (
@@ -391,7 +391,7 @@ async function persistLogsAndDecrements (
391391
}
392392
}
393393

394-
// 2) Then try credits in a separate transaction
394+
// 2) try credits in a separate transaction
395395
const userIds = Array.from(new Set([
396396
...Object.keys(acceptedEmailPerUser),
397397
...Object.keys(acceptedPostsPerUser)
@@ -410,16 +410,25 @@ async function persistLogsAndDecrements (
410410
for (const id of userIds) {
411411
const row = byId.get(id)
412412
if (row == null) continue
413-
const reqEmail = acceptedEmailPerUser[id] ?? 0
414-
const reqPost = acceptedPostsPerUser[id] ?? 0
415-
const decEmail = Math.min(Math.max(reqEmail, 0), row.emailCredits ?? 0)
416-
const decPost = Math.min(Math.max(reqPost, 0), row.postCredits ?? 0)
417-
if (decEmail > 0 || decPost > 0) {
413+
const requestedEmailCredits = acceptedEmailPerUser[id] ?? 0
414+
const requestedPostCredits = acceptedPostsPerUser[id] ?? 0
415+
416+
// Clamp to [0, availableCredits]
417+
const emailCreditsToDecrement = Math.min(
418+
Math.max(requestedEmailCredits, 0),
419+
row.emailCredits ?? 0
420+
)
421+
422+
const postCreditsToDecrement = Math.min(
423+
Math.max(requestedPostCredits, 0),
424+
row.postCredits ?? 0
425+
)
426+
if (emailCreditsToDecrement > 0 || postCreditsToDecrement > 0) {
418427
updates.push(tx.userProfile.update({
419428
where: { id },
420429
data: {
421-
...(decEmail > 0 ? { emailCredits: { decrement: decEmail } } : {}),
422-
...(decPost > 0 ? { postCredits: { decrement: decPost } } : {})
430+
...(emailCreditsToDecrement > 0 ? { emailCredits: { decrement: emailCreditsToDecrement } } : {}),
431+
...(postCreditsToDecrement > 0 ? { postCredits: { decrement: postCreditsToDecrement } } : {})
423432
}
424433
}))
425434
}
@@ -428,7 +437,7 @@ async function persistLogsAndDecrements (
428437
})
429438
} catch (e: any) {
430439
console.error(`[TRIGGER]: credit decrement tx failed: ${e?.message as string ?? e as string}`)
431-
// logs already written; we don’t rollback them
440+
// logs already written;
432441
}
433442
}
434443

@@ -466,20 +475,20 @@ function appendOutOfCreditsLogs (
466475
queue: TriggerTask[],
467476
startIndex: number,
468477
logs: Prisma.TriggerLogCreateManyInput[],
469-
kind: 'PostData' | 'SendEmail'
478+
triggerType: 'PostData' | 'SendEmail'
470479
): void {
471480
const msg =
472-
kind === 'PostData'
481+
triggerType === 'PostData'
473482
? RESPONSE_MESSAGES.USER_OUT_OF_POST_CREDITS_400.message
474483
: RESPONSE_MESSAGES.USER_OUT_OF_EMAIL_CREDITS_400.message
475484

476485
for (let i = startIndex; i < queue.length; i++) {
477486
logs.push({
478487
triggerId: queue[i].triggerId,
479488
isError: true,
480-
actionType: kind,
489+
actionType: triggerType,
481490
data: JSON.stringify({
482-
errorName: kind === 'PostData' ? 'USER_OUT_OF_POST_CREDITS' : 'USER_OUT_OF_EMAIL_CREDITS',
491+
errorName: triggerType === 'PostData' ? 'USER_OUT_OF_POST_CREDITS' : 'USER_OUT_OF_EMAIL_CREDITS',
483492
errorMessage: msg,
484493
errorStack: ''
485494
})
@@ -489,7 +498,7 @@ function appendOutOfCreditsLogs (
489498

490499
export async function executeTriggersBatch (broadcasts: BroadcastTxData[], networkId: number): Promise<void> {
491500
if (process.env.DONT_EXECUTE_TRIGGERS === 'true') {
492-
console.log(`DONT_EXECUTE_TRIGGERS in env, skipping batch execution of triggers ${broadcasts.length}`)
501+
console.log(`DONT_EXECUTE_TRIGGERS in env, skipping batch execution of ${broadcasts.length} triggers`)
493502
return
494503
}
495504

@@ -509,7 +518,7 @@ export async function executeTriggersBatch (broadcasts: BroadcastTxData[], netwo
509518
const logs: Prisma.TriggerLogCreateManyInput[] = []
510519

511520
// Build queues
512-
console.log(`[TRIGGER ${currency}]: preparing batch — txItems=${txItems.length} addresses=${uniqueAddresses.length}`)
521+
console.log(`[TRIGGER ${currency}]: preparing batch — txs=${txItems.length} addresses=${uniqueAddresses.length}`)
513522

514523
for (const { address, tx } of txItems) {
515524
const triggers = triggersByAddress.get(address) ?? []
@@ -583,7 +592,7 @@ export async function executeTriggersBatch (broadcasts: BroadcastTxData[], netwo
583592
}
584593
}
585594

586-
// Build accepted maps for decrements (charge only accepted)
595+
// count accepted triggers for decrements (charge only accepted)
587596
const postsAcceptedByUser = Object.fromEntries(postResults.map(r => [r.userId, r.accepted]))
588597
const emailsAcceptedByUser = Object.fromEntries(emailResults.map(r => [r.userId, r.accepted]))
589598

0 commit comments

Comments
 (0)