diff --git a/.gitignore b/.gitignore index 581e8de..a71753d 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,4 @@ node_modules/ .vscode/ build/ +.claude/ diff --git a/src/batchBuilder.ts b/src/batchBuilder.ts new file mode 100644 index 0000000..e7c65df --- /dev/null +++ b/src/batchBuilder.ts @@ -0,0 +1,68 @@ +import {Interface} from 'ethers'; +import {Call} from './contracts/caller-client'; +import {OxString, SplitsReceiver} from './types'; +import appSettings from './appSettings'; +import {dripsAbi} from './contracts/drips-abi'; + +const dripsInterface = new Interface(dripsAbi); +const dripsAddress = appSettings.network.contracts.drips; + +export function buildReceiveStreamsCall( + accountId: bigint, + token: OxString, + maxCycles: number, +): Call { + return { + target: dripsAddress, + data: dripsInterface.encodeFunctionData('receiveStreams', [ + accountId, + token, + maxCycles, + ]), + value: 0n, + }; +} + +export function buildSplitCall( + accountId: bigint, + token: OxString, + receivers: SplitsReceiver[], +): Call { + // Convert SplitsReceiver[] to the format expected by the contract + const contractReceivers = receivers.map(r => ({ + accountId: r.accountId, + weight: r.weight, + })); + + return { + target: dripsAddress, + data: dripsInterface.encodeFunctionData('split', [ + accountId, + token, + contractReceivers, + ]), + value: 0n, + }; +} + +/** + * Splits an array into chunks of a given size + * @param array - The array to chunk + * @param size - The size of each chunk (must be a positive integer) + * @returns Array of chunks + */ +export function chunk(array: T[], size: number): T[][] { + if (array.length === 0) { + return []; + } + + if (!Number.isInteger(size) || size <= 0) { + throw new Error(`Chunk size must be a positive integer, got: ${size}`); + } + + const chunks: T[][] = []; + for (let i = 0; i < array.length; i += size) { + chunks.push(array.slice(i, i + size)); + } + return chunks; +} diff --git a/src/contracts/caller-abi.ts b/src/contracts/caller-abi.ts new file mode 100644 index 0000000..c0787be --- /dev/null +++ b/src/contracts/caller-abi.ts @@ -0,0 +1,22 @@ +export const callerAbi = [ + { + inputs: [ + { + components: [ + {internalType: 'address', name: 'target', type: 'address'}, + {internalType: 'bytes', name: 'data', type: 'bytes'}, + {internalType: 'uint256', name: 'value', type: 'uint256'}, + ], + internalType: 'struct Call[]', + name: 'calls', + type: 'tuple[]', + }, + ], + name: 'callBatched', + outputs: [{internalType: 'bytes[]', name: 'returnData', type: 'bytes[]'}], + stateMutability: 'nonpayable', + type: 'function', + }, +] as const; + +export type CallerAbi = typeof callerAbi; diff --git a/src/contracts/caller-client.ts b/src/contracts/caller-client.ts new file mode 100644 index 0000000..7f2f265 --- /dev/null +++ b/src/contracts/caller-client.ts @@ -0,0 +1,57 @@ +import {Contract, TransactionResponse, ZeroAddress} from 'ethers'; +import appSettings from '../appSettings'; +import {getContractRunner} from '../getWalletInstance'; +import {callerAbi} from './caller-abi'; + +const { + network: { + contracts: {caller: contractAddress}, + name: networkName, + }, +} = appSettings; + +let contractInstance: Contract | null = null; + +export type Call = { + target: string; + data: string; + value: bigint; +}; + +async function getCallerContract(): Promise { + if (contractInstance) { + return contractInstance; + } + + if (!contractAddress || contractAddress === ZeroAddress) { + throw new Error( + `No Caller contract address configured for chain: ${networkName}`, + ); + } + + try { + contractInstance = new Contract( + contractAddress, + callerAbi, + await getContractRunner(), + ); + return contractInstance; + } catch (error) { + throw new Error( + `Failed to initialize Caller contract: ${error instanceof Error ? error.message : 'Unknown error'}`, + ); + } +} + +export async function callerBatchedCall( + calls: Call[], +): Promise { + try { + const caller = await getCallerContract(); + return await caller.callBatched(calls); + } catch (error) { + throw new Error( + `Caller.callBatched failed: ${error instanceof Error ? error.message : String(error)}`, + ); + } +} diff --git a/src/getNetwork.ts b/src/getNetwork.ts index 3d349e6..728d623 100644 --- a/src/getNetwork.ts +++ b/src/getNetwork.ts @@ -12,6 +12,7 @@ export default function getNetwork(chain: ChainId): Network { contracts: { drips: '0xd0Dd053392db676D57317CD4fe96Fc2cCf42D0b4', repoSubAccountDriver: '0xc219395880fa72e3ad9180b8878e0d39d144130b', + caller: '0x60F25ac5F289Dc7F640f948521d486C964A248e5', }, }; case 11155111: @@ -22,6 +23,7 @@ export default function getNetwork(chain: ChainId): Network { contracts: { drips: '0x74A32a38D945b9527524900429b083547DeB9bF4', repoSubAccountDriver: '0x317400fd9dfdad78d53a34455d89beb8f03f90ee', + caller: '0x09e04Cb8168bd0E8773A79Cc2099f19C46776Fee', }, }; case 314: @@ -32,6 +34,7 @@ export default function getNetwork(chain: ChainId): Network { contracts: { drips: '0xd320F59F109c618b19707ea5C5F068020eA333B3', repoSubAccountDriver: '0x925a69f6d07ee4c753df139bcc2a946e1d1ee92a', + caller: '0xd6Ab8e72dE3742d45AdF108fAa112Cd232718828', }, }; case 1088: @@ -42,6 +45,7 @@ export default function getNetwork(chain: ChainId): Network { contracts: { drips: '0xd320F59F109c618b19707ea5C5F068020eA333B3', repoSubAccountDriver: '0x925a69f6d07ee4c753df139bcc2a946e1d1ee92a', + caller: '0xd6Ab8e72dE3742d45AdF108fAa112Cd232718828', }, }; case 10: @@ -52,6 +56,7 @@ export default function getNetwork(chain: ChainId): Network { contracts: { drips: '0xd320F59F109c618b19707ea5C5F068020eA333B3', repoSubAccountDriver: '0x925a69f6d07ee4c753df139bcc2a946e1d1ee92a', + caller: '0xd6Ab8e72dE3742d45AdF108fAa112Cd232718828', }, }; case 31337: @@ -62,6 +67,10 @@ export default function getNetwork(chain: ChainId): Network { contracts: { drips: '0x7CBbD3FdF9E5eb359E6D9B12848c5Faa81629944', repoSubAccountDriver: '0xB8743C2bB8DF7399273aa7EE4cE8d4109Bec327F', + // Override via LOCAL_CALLER_ADDRESS env var when testing batch functionality + caller: + process.env.LOCAL_CALLER_ADDRESS ?? + '0x0000000000000000000000000000000000000000', }, }; } diff --git a/src/index.ts b/src/index.ts index 9b9d85a..2df5f23 100644 --- a/src/index.ts +++ b/src/index.ts @@ -12,14 +12,19 @@ import { ProcessingResult, SplitsReceiver, WriteOperation, + assertOxString, } from './types'; import {notifyDiscord} from './notifyDiscord'; import {getAllProjectsAndSubProjectSortedByCreationDate} from './queries/getAllProjectsSortedByCreationDate'; import {getAllOrcidAccountsWithSplits} from './queries/getAllOrcidAccountsWithSplits'; import {isRepoSubAccountDriverId} from './isRepoSubAccountDriverId'; +import {callerBatchedCall, Call} from './contracts/caller-client'; +import {buildReceiveStreamsCall, buildSplitCall, chunk} from './batchBuilder'; const MAX_CYCLES = 1000; -const SCRIPT_ITERATIONS = 3; +const SCRIPT_ITERATIONS = 10; +const BATCH_SIZE = 15; +const VALIDATION_CONCURRENCY = 5; // Parallel RPC reads during pre-validation async function checkTotalWeight( splitsReceivers: SplitsReceiver[], @@ -58,6 +63,293 @@ function doIfNotDryRun(fn: () => Promise): Promise | null { return fn(); } +type ValidatedAccount = { + accountId: bigint; + splitsReceivers: SplitsReceiver[]; + receivable: bigint; + splittable: bigint; + type: 'dripList' | 'project' | 'orcidAccount'; +}; + +async function validateSingleAccount( + account: { + accountId: bigint; + splitsReceivers: SplitsReceiver[]; + type: 'dripList' | 'project' | 'orcidAccount'; + }, + token: string, +): Promise { + try { + // Parallel reads for this account + const [receivable, splittable] = await Promise.all([ + dripsReadContract({ + functionName: 'receiveStreamsResult', + args: [account.accountId, token as OxString, MAX_CYCLES], + }), + dripsReadContract({ + functionName: 'splittable', + args: [account.accountId, token as OxString], + }), + ]); + + if (receivable > 0n || splittable > 0n) { + console.log( + ` Account ...${account.accountId.toString().slice(-8)}: receivable=${formatEther(receivable)}, splittable=${formatEther(splittable)} ✓`, + ); + return {...account, receivable, splittable}; + } + return null; + } catch (error) { + console.warn( + ` Pre-validation failed for account ${account.accountId}: ${error instanceof Error ? error.message : String(error)}`, + ); + return null; + } +} + +async function preValidateAccounts( + accounts: Array<{ + accountId: bigint; + splitsReceivers: SplitsReceiver[]; + type: 'dripList' | 'project' | 'orcidAccount'; + }>, + token: string, +): Promise { + const validated: ValidatedAccount[] = []; + const total = accounts.length; + const batches = chunk(accounts, VALIDATION_CONCURRENCY); + + let processed = 0; + for (const batch of batches) { + // Validate batch in parallel + const results = await Promise.all( + batch.map(account => validateSingleAccount(account, token)), + ); + + // Collect non-null results + for (const result of results) { + if (result) { + validated.push(result); + } + } + + processed += batch.length; + console.log(` Pre-validated: ${processed}/${total}`); + } + + return validated; +} + +async function executeBatch( + batch: ValidatedAccount[], + token: OxString, +): Promise { + const writeOperations: WriteOperation[] = []; + const calls: Call[] = []; + + // Track which accounts actually had their calls added (for accurate logging) + const accountsWithReceive: Set = new Set(); + const accountsWithSplit: Set = new Set(); + + // Build calls for each account: receive first, then split + for (const account of batch) { + if (account.receivable > 0n) { + calls.push(buildReceiveStreamsCall(account.accountId, token, MAX_CYCLES)); + accountsWithReceive.add(account.accountId); + } + // Split if there's splittable or if we just received something + if (account.splittable > 0n || account.receivable > 0n) { + // Safety net: validate weights before building split call + // (Primary validation should happen in processDripLists/processProjects) + const totalWeight = account.splitsReceivers.reduce( + (acc, r) => acc + r.weight, + 0, + ); + if (totalWeight !== 1000000 && totalWeight !== 0) { + const message = `Weights Mismatch: Account ${account.accountId} has invalid total weight ${totalWeight}. Skipping split.`; + console.warn(message); + void notifyDiscord(message); + continue; + } + calls.push( + buildSplitCall(account.accountId, token, account.splitsReceivers), + ); + accountsWithSplit.add(account.accountId); + } + } + + if (calls.length === 0) { + return writeOperations; + } + + const accountIds = batch + .map(a => a.accountId.toString().slice(-8)) + .join(', '); + console.log( + ` Executing batch: ${calls.length} calls for ${batch.length} accounts (ids: ...${accountIds})`, + ); + console.log(' Sending transaction...'); + + const txResponse = await doIfNotDryRun(() => callerBatchedCall(calls)); + + if (txResponse) { + console.log(` Transaction sent: ${txResponse.hash}`); + console.log(' Waiting for confirmation...'); + const receipt = await retry(async () => txResponse.wait(1)); + + if (!receipt || receipt.status === 0) { + throw new Error( + `Transaction reverted: ${txResponse.hash}. Check block explorer for details.`, + ); + } + + // Log write operations only for accounts whose calls were actually included + for (const account of batch) { + if (accountsWithReceive.has(account.accountId)) { + writeOperations.push({ + type: 'receive', + accountId: account.accountId, + token, + amount: account.receivable, + txHash: txResponse.hash, + }); + } + if (accountsWithSplit.has(account.accountId)) { + writeOperations.push({ + type: 'split', + accountId: account.accountId, + token, + amount: account.splittable + account.receivable, + txHash: txResponse.hash, + }); + } + } + + console.log(` ✓ Batch confirmed: ${batch.length} accounts processed`); + } else { + console.log(' [DRY RUN] Would execute batch transaction'); + } + + return writeOperations; +} + +type RetryResult = { + writeOperations: WriteOperation[]; + succeededAccounts: bigint[]; + failedAccounts: Array<{accountId: bigint; error: string}>; +}; + +async function retryBatchIndividually( + batch: ValidatedAccount[], + token: string, +): Promise { + console.log( + `Retrying ${batch.length} accounts individually after batch failure...`, + ); + const writeOperations: WriteOperation[] = []; + const succeededAccounts: bigint[] = []; + const failedAccounts: Array<{accountId: bigint; error: string}> = []; + + for (const account of batch) { + try { + const result = await processToken( + account.accountId, + token, + account.splitsReceivers, + account.type, + ); + writeOperations.push(...result.writeOperations); + succeededAccounts.push(account.accountId); + } catch (error) { + const errorMsg = error instanceof Error ? error.message : String(error); + failedAccounts.push({accountId: account.accountId, error: errorMsg}); + console.error( + `Individual retry failed for account ${account.accountId}: ${errorMsg}`, + ); + } + } + + console.log( + `Retry complete: ${succeededAccounts.length} succeeded, ${failedAccounts.length} failed`, + ); + + return {writeOperations, succeededAccounts, failedAccounts}; +} + +async function processBatchedTokens( + accounts: Array<{ + accountId: bigint; + splitsReceivers: SplitsReceiver[]; + type: 'dripList' | 'project' | 'orcidAccount'; + }>, + tokens: OxString[], +): Promise { + const writeOperations: WriteOperation[] = []; + + for (let t = 0; t < tokens.length; t++) { + const token = tokens[t]; + + // Validate token address format before processing + assertOxString(token); + + console.log(`\n Token ${t + 1}/${tokens.length}: ${token}`); + console.log(` Pre-validating ${accounts.length} accounts...`); + + // Pre-validate all accounts for this token + const validAccounts = await preValidateAccounts(accounts, token); + + if (validAccounts.length === 0) { + console.log(' No accounts with receivable/splittable funds. Skipping.'); + continue; + } + + // Split into batches + const batches = chunk(validAccounts, BATCH_SIZE); + console.log( + ` Found ${validAccounts.length} accounts to process → ${batches.length} batch(es)`, + ); + + for (let i = 0; i < batches.length; i++) { + const batch = batches[i]; + console.log( + `\n 📦 Batch ${i + 1}/${batches.length} (${batch.length} accounts)`, + ); + + try { + const batchOperations = await executeBatch(batch, token); + writeOperations.push(...batchOperations); + } catch (error) { + console.error( + `Batch ${i + 1} failed: ${error instanceof Error ? error.message : String(error)}`, + ); + + // Hybrid error handling: retry individually + const retryResult = await retryBatchIndividually(batch, token); + writeOperations.push(...retryResult.writeOperations); + + // Send notification AFTER retry with accurate results + const successCount = retryResult.succeededAccounts.length; + const failCount = retryResult.failedAccounts.length; + + if (failCount > 0) { + const failedIds = retryResult.failedAccounts + .map(f => `...${f.accountId.toString().slice(-8)}`) + .join(', '); + await notifyDiscord( + `⚠️ Batch ${i + 1} failed. Retry: ${successCount}/${batch.length} succeeded, ${failCount} failed permanently. Failed: ${failedIds}`, + ); + } else { + await notifyDiscord( + `✅ Batch ${i + 1} failed but all ${successCount} accounts succeeded on individual retry.`, + ); + } + } + } + } + + return {writeOperations}; +} + async function main(): Promise { if (!appSettings.shouldRun) { console.log('Script is disabled. Exiting...'); @@ -156,34 +448,56 @@ async function processDripLists( tokens: OxString[], ): Promise { console.log('\nProcessing drip lists...'); - const writeOperations: WriteOperation[] = []; const {rows: dripLists} = await getAllDripListsSortedByCreationDate(db); + // Collect all drip list accounts with their splits receivers + const accounts: Array<{ + accountId: bigint; + splitsReceivers: SplitsReceiver[]; + type: 'dripList' | 'project' | 'orcidAccount'; + }> = []; + for (const dripList of dripLists) { const dripListId = dripList.id.toString(); if (appSettings.accountIdsToSkip.includes(dripListId)) { console.log( `Skipping Drip List ${dripListId} as per ACCOUNT_IDS_TO_SKIP.`, ); - continue; // Skip to the next drip list + continue; } const splitsReceivers = await getCurrentSplitsReceivers(db, dripListId); - for (const token of tokens) { - const result = await processToken( - dripList.id, - token, - splitsReceivers, - 'dripList', + const weightsAreCorrect = await checkTotalWeight( + splitsReceivers, + 'dripList', + dripListId, + ); + + if ( + !weightsAreCorrect && + !appSettings.accountIdsToSplitDespiteWrongWeights.includes(dripListId) + ) { + console.warn( + `Drip List ${dripListId} will not be split because weights are incorrect and it's not in the exception list.`, ); - writeOperations.push(...result.writeOperations); + continue; } + + accounts.push({ + accountId: dripList.id, + splitsReceivers, + type: 'dripList', + }); } + console.log(`Collected ${accounts.length} drip lists for batched processing`); + + const result = await processBatchedTokens(accounts, tokens); + console.log('Completed processing drip lists'); - return {writeOperations}; + return result; } async function processProjects( @@ -191,15 +505,21 @@ async function processProjects( tokens: OxString[], ): Promise { console.log('\nProcessing projects...'); - const writeOperations: WriteOperation[] = []; const projects = await getAllProjectsAndSubProjectSortedByCreationDate(db); + // Collect all project accounts with their splits receivers + const accounts: Array<{ + accountId: bigint; + splitsReceivers: SplitsReceiver[]; + type: 'dripList' | 'project' | 'orcidAccount'; + }> = []; + for (const project of projects) { const projectId = project.id.toString(); if (appSettings.accountIdsToSkip.includes(projectId)) { console.log(`Skipping Project ${projectId} as per ACCOUNT_IDS_TO_SKIP.`); - continue; // Skip to the next project + continue; } const splitsReceivers = await getCurrentSplitsReceivers(db, projectId); @@ -210,7 +530,7 @@ async function processProjects( projectId, ); - // Skip processing tokens ONLY if weights are incorrect AND the project ID is NOT in the exception list + // Skip processing ONLY if weights are incorrect AND the project ID is NOT in the exception list if ( !weightsAreCorrect && !appSettings.accountIdsToSplitDespiteWrongWeights.includes(projectId) @@ -218,23 +538,22 @@ async function processProjects( console.warn( `Project ${projectId} will not be split because weights are incorrect and it's not in the exception list.`, ); - continue; // Skip the token processing loop for this project + continue; } - // If weightsAreCorrect is true OR the ID is in the exception list, proceed to the token loop... - for (const token of tokens) { - const result = await processToken( - project.id, - token, - splitsReceivers, - 'project', - ); - writeOperations.push(...result.writeOperations); - } + accounts.push({ + accountId: project.id, + splitsReceivers, + type: 'project', + }); } + console.log(`Collected ${accounts.length} projects for batched processing`); + + const result = await processBatchedTokens(accounts, tokens); + console.log('Completed processing projects'); - return {writeOperations}; + return result; } async function processOrcidAccounts( @@ -242,12 +561,18 @@ async function processOrcidAccounts( tokens: OxString[], ): Promise { console.log('\nProcessing ORCID accounts...'); - const writeOperations: WriteOperation[] = []; const orcidRows = await getAllOrcidAccountsWithSplits(db); console.log(`Found ${orcidRows.length} ORCID accounts to process`); + // Collect all ORCID accounts with their splits receivers + const accounts: Array<{ + accountId: bigint; + splitsReceivers: SplitsReceiver[]; + type: 'dripList' | 'project' | 'orcidAccount'; + }> = []; + for (const row of orcidRows) { const accountIdStr = row.accountId.toString(); @@ -277,19 +602,21 @@ async function processOrcidAccounts( }, ]; - for (const token of tokens) { - const result = await processToken( - row.accountId, - token, - splitsReceivers, - 'orcidAccount', - ); - writeOperations.push(...result.writeOperations); - } + accounts.push({ + accountId: row.accountId, + splitsReceivers, + type: 'orcidAccount', + }); } + console.log( + `Collected ${accounts.length} ORCID accounts for batched processing`, + ); + + const result = await processBatchedTokens(accounts, tokens); + console.log('Completed processing ORCID accounts'); - return {writeOperations}; + return result; } async function processToken( @@ -321,7 +648,13 @@ async function processToken( console.log( `Awaiting 'receiveStreams' transaction ${txResponse.hash} for ${entityDescription}...`, ); - await retry(async () => txResponse.wait(1)); + const receipt = await retry(async () => txResponse.wait(1)); + + if (!receipt || receipt.status === 0) { + throw new Error( + `receiveStreams transaction reverted: ${txResponse.hash}`, + ); + } writeOperations.push({ type: 'receive', @@ -363,7 +696,11 @@ async function processToken( console.log( `Awaiting 'split' transaction ${txResponse.hash} for ${entityDescription}...`, ); - await retry(async () => txResponse.wait(1)); + const receipt = await retry(async () => txResponse.wait(1)); + + if (!receipt || receipt.status === 0) { + throw new Error(`split transaction reverted: ${txResponse.hash}`); + } writeOperations.push({ type: 'split', diff --git a/src/types.ts b/src/types.ts index 91eaba8..c7d48e3 100644 --- a/src/types.ts +++ b/src/types.ts @@ -8,10 +8,24 @@ export type Network = { contracts: { drips: string; repoSubAccountDriver: string; + caller: string; }; }; export type OxString = `0x${string}`; + +/** + * Validates and asserts that a string is a valid OxString (0x-prefixed hex address) + * @throws Error if the string is not a valid 0x-prefixed hex address + */ +export function assertOxString(value: string): asserts value is OxString { + if (!/^0x[0-9a-fA-F]{40}$/.test(value)) { + throw new Error( + `Invalid address format: ${value}. Expected 0x-prefixed 40 hexadecimal characters (20-byte Ethereum address).`, + ); + } +} + export type SplitsReceiver = {accountId: bigint; weight: number}; export type WriteOperation = {