From 423025a561546642fa391804bdb00e8ad521cc82 Mon Sep 17 00:00:00 2001 From: Jacob Gadikian Date: Tue, 4 Mar 2025 23:28:04 +0800 Subject: [PATCH] unfinished --- README.md | 19 + broadcast/broadcast.go | 437 ++++++++++- broadcast/transaction.go | 1014 ++++++++++++++++++++----- broadcast/utils.go | 45 +- lib/account.go | 17 +- lib/chainregistry/cli.go | 6 +- lib/peerdiscovery/discovery.go | 62 +- main.go | 612 +-------------- main_test.go | 393 ---------- modes/registry/registry.go | 399 +++++----- modules/bank/multisend_distributor.go | 48 +- types/types.go | 3 +- 12 files changed, 1628 insertions(+), 1427 deletions(-) delete mode 100644 main_test.go diff --git a/README.md b/README.md index 37c26bd..b5fcbb0 100644 --- a/README.md +++ b/README.md @@ -91,6 +91,25 @@ For advanced users who want more control, you can use a configuration file: meteorite -config -f your_config.toml ``` +### Configuration Options + +Meteorite supports several configuration options: + +- `balance_funds`: Boolean flag that controls whether funds should be automatically balanced between accounts. Default is `false`. + ```toml + balance_funds = true # Enable automatic fund balancing + ``` + +- `multisend`: Boolean flag to enable MultiSend transactions (sends to multiple recipients in one transaction). Default is `true`. + ```toml + multisend = true # Enable MultiSend mode + ``` + +- `num_multisend`: Number of recipients in each MultiSend transaction. Default is `3000`. + ```toml + num_multisend = 3000 # Send to 3000 recipients per transaction + ``` + ### Requirements - Create a file named `seedphrase` containing your seed phrase in the directory where you run meteorite diff --git a/broadcast/broadcast.go b/broadcast/broadcast.go index 76df261..0badea5 100644 --- a/broadcast/broadcast.go +++ b/broadcast/broadcast.go @@ -3,6 +3,9 @@ package broadcast import ( "context" "fmt" + "log" + "strings" + "sync" "time" coretypes "github.com/cometbft/cometbft/rpc/core/types" @@ -81,11 +84,14 @@ func Loop( failedTxs = 0 responseCodes = make(map[uint32]int) sequence := txParams.Sequence + startTime := time.Now() // Log the start of broadcasting for this position - LogVisualizerDebug(fmt.Sprintf("Starting broadcasts for position %d (batchSize: %d)", position, batchSize)) + log.Printf("[POS-%d] Starting broadcasts for position %d (batchSize: %d, node: %s)", + position, position, batchSize, txParams.NodeURL) for i := 0; i < batchSize; i++ { + txStartTime := time.Now() currentSequence := sequence metrics := &TimingMetrics{ PrepStart: time.Now(), @@ -96,42 +102,92 @@ func Loop( metrics.BroadStart = time.Now() resp, _, err := SendTransactionViaRPC(context.Background(), txParams, currentSequence) metrics.Complete = time.Now() + txDuration := metrics.Complete.Sub(txStartTime) // Calculate total transaction time for visualization txLatency := metrics.Complete.Sub(metrics.PrepStart) if err != nil { + // More detailed error logging + errMsg := err.Error() + if len(errMsg) > 100 { + errMsg = errMsg[:100] + "..." // Truncate long errors + } + + log.Printf("[POS-%d] TX FAILED seq=%d node=%s error=\"%s\" time=%v", + position, currentSequence, txParams.NodeURL, errMsg, txDuration) + metrics.LogTiming(currentSequence, false, err) failedTxs++ // Update visualizer with failed tx UpdateVisualizerStats(0, 1, txLatency) + // Handle specific error types if resp != nil && resp.Code == 32 { + // Sequence mismatch newSeq, success, newResp := handleSequenceMismatch(txParams, position, sequence, err) sequence = newSeq if success { successfulTxs++ responseCodes[newResp.Code]++ + log.Printf("[POS-%d] Recovered from sequence mismatch! New seq=%d", position, newSeq) // Update visualizer with successful tx after sequence recovery UpdateVisualizerStats(1, 0, metrics.Complete.Sub(metrics.PrepStart)) } continue } + + // For other specific error codes, check if we should continue or abort this batch + if resp != nil { + log.Printf("[POS-%d] Response code %d from node %s", position, resp.Code, txParams.NodeURL) + responseCodes[resp.Code]++ + + // If we get multiple fee errors in a row, we might need to adjust fees + if resp.Code == 13 { // Insufficient fee error code + if i < 3 { // Only log aggressive warning for early failures + log.Printf("[POS-%d] WARNING: Insufficient fee error from node %s. Consider increasing fees.", + position, txParams.NodeURL) + } + } + } + continue } metrics.LogTiming(currentSequence, true, nil) + log.Printf("[POS-%d] TX SUCCESS seq=%d node=%s code=%d time=%v", + position, currentSequence, txParams.NodeURL, resp.Code, txDuration) + successfulTxs++ responseCodes[resp.Code]++ sequence++ // Update visualizer with successful tx UpdateVisualizerStats(1, 0, txLatency) + + // Add a small delay between transactions to avoid overwhelming nodes + // This helps with mempool congestion + if i < batchSize-1 { + time.Sleep(5 * time.Millisecond) + } + } + + // Calculate success rate and total time + totalTxs := successfulTxs + failedTxs + successRate := 0.0 + if totalTxs > 0 { + successRate = float64(successfulTxs) / float64(totalTxs) * 100 } + totalTime := time.Since(startTime) + txPerSecond := float64(totalTxs) / totalTime.Seconds() + + // Log the completion of broadcasting with detailed stats + log.Printf("[POS-%d] BATCH COMPLETE: %d/%d successful (%.1f%%), %.1f tx/sec, time=%v, node=%s, codes=%v", + position, successfulTxs, totalTxs, successRate, txPerSecond, totalTime, txParams.NodeURL, responseCodes) - // Log the completion of broadcasting for this position + // Log the completion of broadcasting for visualization LogVisualizerDebug(fmt.Sprintf("Completed broadcasts for position %d: %d successful, %d failed", position, successfulTxs, failedTxs)) @@ -167,3 +223,380 @@ func handleSequenceMismatch(txParams types.TransactionParams, position int, sequ metrics.LogTiming(expectedSeq, true, nil) return expectedSeq + 1, true, resp } + +// ParallelBroadcast broadcasts transactions in parallel across multiple nodes +// This dramatically improves throughput by utilizing all available RPC nodes +func ParallelBroadcast( + accounts []types.Account, + config types.Config, + chainID string, + txsPerAccount int, +) (int, int) { + // Validate inputs + if len(accounts) == 0 { + log.Printf("Error: No accounts provided for parallel broadcast") + return 0, 0 + } + + if len(config.Nodes.RPC) == 0 { + log.Printf("Error: No RPC nodes configured for parallel broadcast") + return 0, 0 + } + + startTime := time.Now() + log.Printf("šŸš€ Starting ultra-high-speed parallel broadcast across %d accounts and %d nodes", + len(accounts), len(config.Nodes.RPC)) + log.Printf("Transaction type: %s", config.MsgType) + + // Track nodes that are known to be down + badNodesMap := make(map[string]bool) + var badNodesMutex sync.RWMutex + + // Get healthy nodes only + healthyNodes := GetHealthyNodesOnly(config.Nodes.RPC) + if len(healthyNodes) == 0 { + log.Printf("Warning: No healthy nodes found, will attempt to use all configured nodes") + healthyNodes = config.Nodes.RPC + } + + // Create position to node mapping + positionToNode := make(map[int]string) + + // Create a mapping of account address to Account for easy lookup + accountMap := make(map[string]types.Account) + for _, acct := range accounts { + accountMap[acct.Address] = acct + } + + // Track sequence numbers per node per account + type accountNodeKey struct { + address string + nodeURL string + } + sequenceMap := make(map[accountNodeKey]uint64) + var sequenceMutex sync.RWMutex + + // Track destination addresses for IBC transactions if needed + destinationAddresses := make(map[string]string) + + // Cache for IBC channel info + ibcChannelCache := make(map[string]string) + var ibcCacheMutex sync.RWMutex + + // Create synchronization primitives + var wg sync.WaitGroup + var successMutex sync.Mutex + totalSuccessful := 0 + totalFailed := 0 + + // Increase parallelism substantially for higher throughput + maxConcurrentPositions := len(healthyNodes) * 6 // 6 positions per node for even more concurrency + if maxConcurrentPositions > len(accounts) { + maxConcurrentPositions = len(accounts) + } + + log.Printf("Using %d concurrent transaction senders", maxConcurrentPositions) + + // Use a worker pool model for maximum throughput + positionsToProcess := make(chan int, len(accounts)) + + // Fill the channel with positions to process + for pos := 0; pos < len(accounts); pos++ { + positionsToProcess <- pos + } + close(positionsToProcess) + + // Launch worker pool + for i := 0; i < maxConcurrentPositions; i++ { + wg.Add(1) + go func(workerID int) { + defer wg.Done() + + // Each worker processes positions from the channel + for position := range positionsToProcess { + // Assign this position to a node + nodeIdx := position % len(healthyNodes) + nodeURL := healthyNodes[nodeIdx] + + // Store the assignment + positionToNode[position] = nodeURL + + // Get the account for this position + account := accounts[position] + + // Get or initialize sequence + sequenceMutex.RLock() + key := accountNodeKey{address: account.Address, nodeURL: nodeURL} + sequence, hasSequence := sequenceMap[key] + sequenceMutex.RUnlock() + + if !hasSequence { + // Get account info using the assigned node + initialSeq, _, err := lib.GetAccountInfo(account.Address, config) + if err != nil { + log.Printf("[Worker %d] Failed to get account info for %s from node %s: %v", + workerID, account.Address, nodeURL, err) + + // Mark this node as bad + badNodesMutex.Lock() + badNodesMap[nodeURL] = true + badNodesMutex.Unlock() + + // Try to find a new node + badNodesMutex.RLock() + availableNodes := make([]string, 0) + for _, node := range healthyNodes { + if !badNodesMap[node] { + availableNodes = append(availableNodes, node) + } + } + badNodesMutex.RUnlock() + + if len(availableNodes) == 0 { + log.Printf("[Worker %d] No available nodes, skipping position %d", workerID, position) + continue + } + + // Reassign to a new node + newNodeIdx := position % len(availableNodes) + nodeURL = availableNodes[newNodeIdx] + positionToNode[position] = nodeURL + + // Try again with the new node + initialSeq, _, err = lib.GetAccountInfo(account.Address, config) + if err != nil { + log.Printf("[Worker %d] Still failed to get account info after node reassignment: %v", workerID, err) + continue + } + } + + // Store the sequence + sequenceMutex.Lock() + sequenceMap[key] = initialSeq + sequenceMutex.Unlock() + + sequence = initialSeq + } + + // Prepare transaction parameters + txParams := types.TransactionParams{ + Config: config, + NodeURL: nodeURL, + ChainID: chainID, + PrivKey: account.PrivKey, + AcctAddress: account.Address, + AccNum: 0, // Will be fetched during tx building + Sequence: sequence, + MsgType: config.MsgType, + MsgParams: map[string]interface{}{ + "from_address": account.Address, + "amount": 1000, // Starting with a very low amount + "denom": config.Denom, + }, + } + + // Set up transaction parameters based on message type + switch config.MsgType { + case "bank_send": + // Use the next account as receiver, wrap around if needed + receiverIdx := (position + 1) % len(accounts) + receiver := accounts[receiverIdx].Address + txParams.MsgParams["to_address"] = receiver + + case "multisend": + // Set up multisend parameters + // Send to multiple receivers (up to 10) + recipients := make([]map[string]interface{}, 0, 5) + for i := 1; i <= 5; i++ { + recipientIdx := (position + i) % len(accounts) + recipient := accounts[recipientIdx].Address + recipients = append(recipients, map[string]interface{}{ + "address": recipient, + "amount": 200, // Send a small amount to each recipient + "denom": config.Denom, + }) + } + txParams.MsgParams["recipients"] = recipients + + case "ibc_transfer": + // Check if we have a cached destination address + destAddr, exists := destinationAddresses[account.Address] + if !exists { + // Generate a random destination address if not using real ones + destAddr = fmt.Sprintf("cosmos1random%d", position) + destinationAddresses[account.Address] = destAddr + } + + // Get or determine IBC channel + ibcCacheMutex.RLock() + channelID, channelExists := ibcChannelCache[config.Chain] + ibcCacheMutex.RUnlock() + + if !channelExists { + // Let's use either a configured channel or a reasonable default + channelID = "channel-0" // Default fallback + + // Cache it + ibcCacheMutex.Lock() + ibcChannelCache[config.Chain] = channelID + ibcCacheMutex.Unlock() + } + + // Set up IBC transfer parameters + txParams.MsgParams["to_address"] = destAddr + txParams.MsgParams["source_channel"] = channelID + txParams.MsgParams["source_port"] = "transfer" + txParams.MsgParams["timeout_height"] = "0-0" // No timeout height + txParams.MsgParams["timeout_timestamp"] = time.Now().Add(10 * time.Minute).UnixNano() // 10 min timeout + } + + // Send transactions aggressively + successful := 0 + failed := 0 + + // Launch multiple concurrent transactions for this position + var txWg sync.WaitGroup + const concurrentTxsPerPosition = 10 // Increased from 5 to 10 for more speed + + for batch := 0; batch < txsPerAccount; batch += concurrentTxsPerPosition { + // Calculate how many transactions to send in this batch + remainingTxs := txsPerAccount - batch + batchSize := concurrentTxsPerPosition + if remainingTxs < batchSize { + batchSize = remainingTxs + } + + if batchSize <= 0 { + break + } + + // Send transactions in this batch concurrently + for txOffset := 0; txOffset < batchSize; txOffset++ { + txWg.Add(1) + + go func(txNum int) { + defer txWg.Done() + + // Create a local copy of transaction parameters + localParams := txParams + + // Calculate sequence for this transaction + sequenceMutex.RLock() + currentSequence := sequenceMap[key] + sequenceMutex.RUnlock() + + // Update sequence for this tx + localParams.Sequence = currentSequence + uint64(txNum) + + // Create a timeout context for this transaction - shorter timeout for faster feedback + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + + // Send the transaction + resp, err := SendTx(ctx, localParams, localParams.Sequence, config.BroadcastMode) + + if err != nil { + successMutex.Lock() + failed++ + totalFailed++ + successMutex.Unlock() + + // Only log errors occasionally to reduce spam + if failed%20 == 0 { + log.Printf("[Worker %d] Position %d: Error on tx %d: %v", + workerID, position, txNum, err) + } + + // Check if this was a node failure + if strings.Contains(err.Error(), "context deadline exceeded") || + strings.Contains(err.Error(), "connection refused") || + strings.Contains(err.Error(), "EOF") { + // Mark this node as bad + badNodesMutex.Lock() + badNodesMap[nodeURL] = true + badNodesMutex.Unlock() + } + + return + } + + // Update node performance metrics + UpdateNodePerformance(nodeURL, true, time.Since(startTime)) + + if resp != nil && resp.Code == 0 { + successMutex.Lock() + successful++ + totalSuccessful++ + successMutex.Unlock() + + // Update sequence + sequenceMutex.Lock() + if localParams.Sequence >= sequenceMap[key] { + sequenceMap[key] = localParams.Sequence + 1 + } + sequenceMutex.Unlock() + + // Only log successes occasionally to reduce spam + if successful%100 == 0 { + successRate := float64(successful) / float64(successful+failed) * 100 + log.Printf("[Worker %d] Position %d: %d/%d successful (%.1f%%)", + workerID, position, successful, successful+failed, successRate) + } + } else { + successMutex.Lock() + failed++ + totalFailed++ + successMutex.Unlock() + } + }(txOffset) + } + + // Wait a very short time between batches to avoid overwhelming the node + // Reduced from 10ms to 5ms for more speed + time.Sleep(5 * time.Millisecond) + } + + // Wait for all transactions for this position to complete + txWg.Wait() + + // Log completion for this position + successRate := 0.0 + if successful+failed > 0 { + successRate = float64(successful) / float64(successful+failed) * 100 + } + + log.Printf("[Worker %d] Position %d complete: %d successful, %d failed (%.1f%%)", + workerID, position, successful, failed, successRate) + } + }(i) + } + + // Wait for all goroutines to complete + wg.Wait() + + // Calculate duration and transactions per second + duration := time.Since(startTime) + var tps float64 + if duration.Seconds() > 0 { + tps = float64(totalSuccessful) / duration.Seconds() + } + + // Log overall statistics + log.Printf("šŸš€ Ultra high-speed parallel broadcast complete: %d successful, %d failed in %v (%.2f TPS)", + totalSuccessful, totalFailed, duration, tps) + + // Log per-node success rates + log.Printf("Node performance summary:") + for _, node := range config.Nodes.RPC { + perf := GetOrCreateNodePerformance(node) + total := perf.SuccessCount + perf.FailureCount + successRate := 0.0 + if total > 0 { + successRate = float64(perf.SuccessCount) / float64(total) * 100 + } + log.Printf(" Node %s: %d/%d txs (%.1f%%), avg latency: %v", + node, perf.SuccessCount, total, successRate, perf.AverageLatency) + } + + return totalSuccessful, totalFailed +} diff --git a/broadcast/transaction.go b/broadcast/transaction.go index 606b9f2..391a306 100644 --- a/broadcast/transaction.go +++ b/broadcast/transaction.go @@ -5,12 +5,15 @@ import ( "encoding/json" "errors" "fmt" + "log" "regexp" + "sort" "strconv" "strings" "sync" "time" + "github.com/somatic-labs/meteorite/lib" types "github.com/somatic-labs/meteorite/types" sdkmath "cosmossdk.io/math" @@ -18,9 +21,11 @@ import ( sdkclient "github.com/cosmos/cosmos-sdk/client" "github.com/cosmos/cosmos-sdk/client/tx" sdk "github.com/cosmos/cosmos-sdk/types" - signing "github.com/cosmos/cosmos-sdk/types/tx/signing" + signingtypes "github.com/cosmos/cosmos-sdk/types/tx/signing" xauthsigning "github.com/cosmos/cosmos-sdk/x/auth/signing" banktypes "github.com/cosmos/cosmos-sdk/x/bank/types" + ibctransfertypes "github.com/cosmos/ibc-go/v8/modules/apps/transfer/types" + clienttypes "github.com/cosmos/ibc-go/v8/modules/core/02-client/types" ) const ( @@ -34,9 +39,20 @@ const ( minGasWasmExecute = 300000 minGasDefault = 100000 - // Error patterns - insufficientFeesPattern = "insufficient fees; got: \\d+\\w+ required: (\\d+)\\w+" + // Error patterns - expanded to catch more variations + insufficientFeesPattern = "insufficient fees; got: \\d+\\w+ required: (\\d+)\\w+" + insufficientFeesAltPattern1 = "got: [^;]+; required: (\\d+)\\w+" + insufficientFeesAltPattern2 = "required: (\\d+)\\w+" + + // Sequence error patterns sequenceMismatchPattern = "account sequence mismatch: expected (\\d+), got \\d+" + sequenceAltPattern1 = "expected sequence: (\\d+)" + sequenceAltPattern2 = "sequence (\\d+) but got \\d+" + sequenceAltPattern3 = "expected (\\d+), got \\d+" + sequenceAltPattern4 = "sequence \\d+ != (\\d+)" + + // Constants for error message patterns + maxErrorQueueSize = 10 // Max number of recent error messages to store ) // multiSendDistributor defines the interface for creating MultiSend messages @@ -44,6 +60,39 @@ type multiSendDistributor interface { CreateDistributedMultiSendMsg(fromAddress string, msgParams types.MsgParams, seed int64) (sdk.Msg, string, error) } +// ErrorQueue stores recent error messages for pattern matching +type ErrorQueue struct { + messages []string + mutex sync.RWMutex +} + +// Global instance of the error queue +var regexErrorQueue = &ErrorQueue{ + messages: make([]string, 0, maxErrorQueueSize), +} + +// AddError adds an error message to the queue +func (q *ErrorQueue) AddError(errMsg string) { + q.mutex.Lock() + defer q.mutex.Unlock() + + // Add new error message + q.messages = append(q.messages, errMsg) + + // Trim queue if necessary + if len(q.messages) > maxErrorQueueSize { + q.messages = q.messages[len(q.messages)-maxErrorQueueSize:] + } +} + +// GetErrorString returns a concatenated string of all recent errors for pattern matching +func (q *ErrorQueue) GetErrorString() string { + q.mutex.RLock() + defer q.mutex.RUnlock() + + return strings.Join(q.messages, " ") +} + // convertMapToMsgParams converts a map of parameters to the MsgParams struct func convertMapToMsgParams(paramsMap map[string]interface{}) types.MsgParams { msgParams := types.MsgParams{} @@ -77,20 +126,63 @@ func createBankSendMsg(txParams *types.TxParams) (sdk.Msg, error) { return nil, errors.New("from_address is required for bank_send") } - toAddrStr, ok := txParams.MsgParams["to_address"].(string) - if !ok || toAddrStr == "" { - return nil, errors.New("to_address is required for bank_send") - } - - // Parse addresses + // Parse from address fromAddr, err := sdk.AccAddressFromBech32(fromAddrStr) if err != nil { return nil, fmt.Errorf("invalid from address: %w", err) } + // Get to_address from params + toAddrStr, ok := txParams.MsgParams["to_address"].(string) + if !ok || toAddrStr == "" { + // If to_address is not specified, get a random one from balances.csv + // First determine the prefix to use - either from config or extract from from_address + prefix := "atone" // Default to atone prefix + if txParams.Config.AccountPrefix != "" { + prefix = txParams.Config.AccountPrefix + log.Printf("Using account prefix from config: %s", prefix) + } else if txParams.Config.Prefix != "" { + prefix = txParams.Config.Prefix + log.Printf("Using prefix from config: %s", prefix) + } else { + // Extract prefix from fromAddrStr + parts := strings.Split(fromAddrStr, "1") + if len(parts) > 0 && parts[0] != "" { + prefix = parts[0] + log.Printf("Extracted prefix from from_address: %s", prefix) + } else { + log.Printf("Could not extract prefix from from_address, using default prefix: %s", prefix) + } + } + + // Get address manager and try to get a random address + addressManager := lib.GetAddressManager() + + // First try to load addresses if needed + if err := addressManager.LoadAddressesFromCSV(); err != nil { + log.Printf("Warning: Failed to load addresses from balances.csv: %v", err) + } + + randomAddr, err := addressManager.GetRandomAddressWithPrefix(prefix) + if err != nil { + log.Printf("Failed to get random address from balances.csv: %v", err) + return nil, fmt.Errorf("to_address is required for bank_send and failed to get random address: %w", err) + } + + if randomAddr == "" { + log.Printf("Got empty random address from balances.csv") + return nil, errors.New("failed to get valid to_address from balances.csv") + } + + toAddrStr = randomAddr + log.Printf("Using random address from balances.csv with prefix %s: %s", prefix, toAddrStr) + } + + // Parse to address toAddr, err := sdk.AccAddressFromBech32(toAddrStr) if err != nil { - return nil, fmt.Errorf("invalid to address: %w", err) + log.Printf("Error parsing to_address %s: %v", toAddrStr, err) + return nil, fmt.Errorf("invalid to address %s: %w", toAddrStr, err) } // Extract amount @@ -123,11 +215,16 @@ func createBankSendMsg(txParams *types.TxParams) (sdk.Msg, error) { coins := sdk.NewCoins(coin) // Create bank send message - return &banktypes.MsgSend{ + msg := &banktypes.MsgSend{ FromAddress: fromAddr.String(), ToAddress: toAddr.String(), Amount: coins, - }, nil + } + + log.Printf("Created bank send message from %s to %s with amount %d %s", + fromAddr.String(), toAddr.String(), amount, denom) + + return msg, nil } // createMultiSendMsg creates a bank multisend message from the provided parameters @@ -138,8 +235,54 @@ func createMultiSendMsg(txParams *types.TxParams) (sdk.Msg, error) { // createIbcTransferMsg creates an IBC transfer message func createIbcTransferMsg(txParams *types.TxParams) (sdk.Msg, error) { - // Just a stub for now - return nil, errors.New("ibc_transfer not implemented") + // Get necessary parameters from txParams + fromAddress, ok := txParams.MsgParams["from_address"].(string) + if !ok || fromAddress == "" { + return nil, errors.New("from_address not specified") + } + + toAddress, ok := txParams.MsgParams["to_address"].(string) + if !ok || toAddress == "" { + return nil, errors.New("to_address not specified") + } + + // Check if amount is specified + amount, ok := txParams.MsgParams["amount"].(int64) + if !ok || amount <= 0 { + return nil, errors.New("invalid amount") + } + + // Get denom from config + denom := txParams.Config.Denom + if denom == "" { + return nil, errors.New("denom not specified in config") + } + + // Create a coin with the amount and denom + coin := sdk.NewCoin(denom, sdkmath.NewInt(amount)) + + // Use a default source port and channel if not specified + sourcePort := "transfer" + sourceChannel := "channel-0" // Default channel + + // Check if channel is specified in the config + if txParams.Config.Channel != "" { + sourceChannel = txParams.Config.Channel + } + + // For IBC transfers, we need to use the IBC Transfer module's MsgTransfer + // This uses ibc-go v8 compatible imports + transferMsg := &ibctransfertypes.MsgTransfer{ + SourcePort: sourcePort, + SourceChannel: sourceChannel, + Token: coin, + Sender: fromAddress, + Receiver: toAddress, + TimeoutHeight: clienttypes.Height{}, + TimeoutTimestamp: 0, // No timeout + } + + return transferMsg, nil } // createStoreCodeMsg creates a store code message for CosmWasm @@ -189,7 +332,8 @@ func BuildTransaction(ctx context.Context, txParams *types.TxParams) ([]byte, er fromAddress, _ := txParams.MsgParams["from_address"].(string) accNum, accSeq, err := GetAccountInfo(ctx, clientCtx, fromAddress) if err != nil { - return nil, fmt.Errorf("failed to get account info: %w", err) + return nil, fmt.Errorf("failed to get account info for %s at node %s: %w", + fromAddress, txParams.NodeURL, err) } // Choose gas limit based on message type, gas settings, and node capabilities @@ -272,7 +416,7 @@ func BuildTransaction(ctx context.Context, txParams *types.TxParams) ([]byte, er // Create the signature sigV2, err := tx.SignWithPrivKey( ctx, - signing.SignMode_SIGN_MODE_DIRECT, + signingtypes.SignMode_SIGN_MODE_DIRECT, xauthsigning.SignerData{ ChainID: txParams.ChainID, AccountNumber: accNum, @@ -597,17 +741,28 @@ func BuildAndSignTransaction( sequence uint64, _ interface{}, ) ([]byte, error) { + startTime := time.Now() + log.Printf("=== Building transaction for %s ===", txParams.AcctAddress) + // First, check if we have more up-to-date sequence info for this node from previous errors + // Only use local node sequence tracking, don't coordinate across nodes nodeSettings := getNodeSettings(txParams.NodeURL) nodeSettings.mutex.RLock() if nodeSettings.LastSequence > sequence { // Log that we're using the cached sequence from previous error responses - fmt.Printf("Using cached sequence %d instead of provided %d for node %s (from previous tx errors)\n", + log.Printf("Using cached sequence %d instead of provided %d for node %s (local tracking only)", nodeSettings.LastSequence, sequence, txParams.NodeURL) sequence = nodeSettings.LastSequence } nodeSettings.mutex.RUnlock() + // If we have no sequence yet, always start with 1 (not 0) and let the mempool tell us the correct value + if sequence == 0 { + sequence = 1 + log.Printf("Starting with sequence 1 for account on node %s (will learn from mempool)", + txParams.NodeURL) + } + // We need to ensure the passed-in sequence is used txp := &types.TxParams{ Config: txParams.Config, @@ -618,6 +773,20 @@ func BuildAndSignTransaction( MsgParams: txParams.MsgParams, } + // Log the message parameters for debugging + logMsgParams := make(map[string]interface{}) + for k, v := range txParams.MsgParams { + logMsgParams[k] = v + } + // Don't log sensitive data + if _, ok := logMsgParams["private_key"]; ok { + logMsgParams["private_key"] = "[REDACTED]" + } + + paramsJson, _ := json.Marshal(logMsgParams) + log.Printf("Transaction parameters: %s", string(paramsJson)) + log.Printf("Message type: %s", txParams.MsgType) + // Pass distributor through MsgParams for multisend operations if txParams.Distributor != nil && txParams.MsgType == "bank_multisend" { if txp.MsgParams == nil { @@ -626,15 +795,30 @@ func BuildAndSignTransaction( txp.MsgParams["distributor"] = txParams.Distributor } - // Use ClientContext with correct sequence - clientCtx, err := GetClientContext(txParams.Config, txParams.NodeURL) + // Use ClientContext with correct sequence - retry up to 3 times on failure + var clientCtx sdkclient.Context + var err error + maxRetries := 3 + + for i := 0; i < maxRetries; i++ { + clientCtx, err = GetClientContext(txParams.Config, txParams.NodeURL) + if err == nil { + break + } + log.Printf("Warning: Failed to get client context on attempt %d: %v", i+1, err) + if i < maxRetries-1 { + time.Sleep(100 * time.Millisecond) + } + } + if err != nil { - return nil, fmt.Errorf("failed to get client context: %w", err) + return nil, fmt.Errorf("failed to create client context after %d attempts: %w", maxRetries, err) } // Build and sign the transaction msg, err := createMessage(txp) if err != nil { + log.Printf("Error creating message: %v", err) return nil, fmt.Errorf("failed to create message: %w", err) } @@ -643,142 +827,66 @@ func BuildAndSignTransaction( // Set the message and other transaction parameters if err := txBuilder.SetMsgs(msg); err != nil { + log.Printf("Error setting messages: %v", err) return nil, fmt.Errorf("failed to set messages: %w", err) } // Check if there's a pre-calculated gas amount for multisend transactions var gasLimit uint64 if calculatedGas, ok := txp.MsgParams["calculated_gas_amount"].(uint64); ok && calculatedGas > 0 { - // Use the pre-calculated gas amount for multisend gasLimit = calculatedGas - fmt.Printf("Using pre-calculated gas amount for multisend: %d\n", gasLimit) } else { - // Estimate gas through simulation - simulatedGas, err := DetermineOptimalGas(ctx, clientCtx, tx.Factory{}, 1.3, msg) - if err != nil { - // Use default if simulation fails - gasLimit = uint64(txParams.Config.BaseGas) - fmt.Printf("Gas simulation failed, using default gas: %d\n", gasLimit) - } else { - gasLimit = simulatedGas - } + // Default to message type specific gas limit + gasLimit = getDefaultGasLimitByMsgType(txp.MsgType) } - txBuilder.SetGasLimit(gasLimit) - - // Set fee - get the gas price from config - gasPrice := txParams.Config.Gas.Low - - // For zero gas price, check if we should use adaptive gas pricing - if gasPrice == 0 { - gsm := GetGasStrategyManager() - caps := gsm.GetNodeCapabilities(txParams.NodeURL) - if !caps.AcceptsZeroGas { - // Use low non-zero gas price if node doesn't support zero gas - gasPrice = 1 - fmt.Printf("Node %s may not support zero gas, using gas price: %d\n", - txParams.NodeURL, gasPrice) - } - } + // Start with a low initial gas price to let us learn from errors + gasPrice := int64(1) // Start very low and let the RPC tell us if we need more - // Calculate initial fee amount + // Calculate fee amount based on gas limit and price feeAmount := calculateInitialFee(gasLimit, gasPrice) - // Important: Check if we have a stored minimum fee for this node and use it if higher - nodeSettings.mutex.RLock() - if minFee, exists := nodeSettings.MinimumFees[txParams.Config.Denom]; exists { - if uint64(feeAmount) < minFee { - fmt.Printf("Using node-specific minimum fee %d instead of calculated %d for %s\n", - minFee, feeAmount, txParams.NodeURL) - feeAmount = int64(minFee) - } - } - nodeSettings.mutex.RUnlock() - - // Apply a more aggressive minimum fee strategy to prevent "insufficient fees" errors - // For nodes that have previously returned fee errors, add a buffer - baseFeeThreshold := int64(200) - - // For large multisend transactions, ensure the fee is proportionally higher - if txParams.MsgType == "bank_multisend" && gasLimit > 100000 { - // Much higher minimum fee for large multisends - use gas-based calculation - minFee := int64(gasLimit) / 5000 // More aggressive scaling (changed from 10000) - if feeAmount < minFee { - feeAmount = minFee - fmt.Printf("Increasing multisend fee to %d based on gas usage\n", feeAmount) - } - } else if gasPrice > 0 && feeAmount < baseFeeThreshold { - // For regular transactions, use higher minimum fee - feeAmount = baseFeeThreshold - fmt.Printf("Using minimum fee threshold of %d\n", feeAmount) + // Set a very low minimum fee to start with + minFeeAmount := int64(1) // Start with minimum possible fee + if feeAmount < minFeeAmount { + log.Printf("Using minimum initial fee of %d", minFeeAmount) + feeAmount = minFeeAmount } - // Apply additional node-specific fee buffer based on historical errors - // This helps prevent fee errors on chains that are sensitive to fee amounts + // Apply node-specific fee buffers based on history (these are learned from errors) + // This will increase fees automatically if we've learned from previous errors feeAmount = applyNodeFeeBuffer(txParams.NodeURL, txParams.Config.Denom, feeAmount) - feeCoin := fmt.Sprintf("%d%s", feeAmount, txParams.Config.Denom) - fee, err := sdk.ParseCoinsNormalized(feeCoin) - if err != nil { - return nil, fmt.Errorf("failed to parse fee: %w", err) - } + // Create the fee + fee := sdk.NewCoins(sdk.NewCoin(txParams.Config.Denom, sdkmath.NewInt(feeAmount))) + log.Printf("Setting gas limit to %d and fee to %d %s", gasLimit, feeAmount, txParams.Config.Denom) - fmt.Printf("Setting transaction fee: %s (gas limit: %d, gas price: %d) for node %s\n", - fee.String(), gasLimit, gasPrice, txParams.NodeURL) + txBuilder.SetGasLimit(gasLimit) txBuilder.SetFeeAmount(fee) - // Set memo if provided - txBuilder.SetMemo("") - - // Get account number - this is still needed, but we won't use its sequence - accNum := txParams.AccNum - fromAddress, _ := txParams.MsgParams["from_address"].(string) - - // We only need account number from GetAccountInfo, NOT the sequence - // The sequence we use should be from our tracking system, which considers mempool state - fetchedAccNum, stateSequence, err := GetAccountInfo(ctx, clientCtx, fromAddress) - if err != nil { - return nil, fmt.Errorf("failed to get account info: %w", err) - } - - // Use the fetched account number - accNum = fetchedAccNum - - // Only log the state sequence for debugging, but don't use it directly - // This helps us understand discrepancies between state and our tracked sequences - fmt.Printf("Node %s reports state sequence %d, using tracked sequence %d\n", - txParams.NodeURL, stateSequence, sequence) - - // If we have no better information (first tx to this node), then use state sequence - if sequence == 0 { - sequence = stateSequence - fmt.Printf("First transaction to node %s, using state sequence: %d\n", txParams.NodeURL, sequence) - updateSequence(txParams.NodeURL, sequence) - } - - // Set up signature - sigV2 := signing.SignatureV2{ - PubKey: txParams.PubKey, - Sequence: sequence, - Data: &signing.SingleSignatureData{ - SignMode: signing.SignMode_SIGN_MODE_DIRECT, + // Prepare an empty signature to get the correct size + sigV2 := signingtypes.SignatureV2{ + PubKey: txParams.PrivKey.PubKey(), + Data: &signingtypes.SingleSignatureData{ + SignMode: signingtypes.SignMode_SIGN_MODE_DIRECT, }, } if err := txBuilder.SetSignatures(sigV2); err != nil { + log.Printf("Error setting signatures: %v", err) return nil, fmt.Errorf("failed to set signatures: %w", err) } signerData := xauthsigning.SignerData{ ChainID: txParams.ChainID, - AccountNumber: accNum, + AccountNumber: txParams.AccNum, Sequence: sequence, } // Sign the transaction with the private key sigV2, err = tx.SignWithPrivKey( ctx, - signing.SignMode_SIGN_MODE_DIRECT, + signingtypes.SignMode_SIGN_MODE_DIRECT, signerData, txBuilder, txParams.PrivKey, @@ -786,22 +894,30 @@ func BuildAndSignTransaction( sequence, ) if err != nil { + log.Printf("Error signing transaction: %v", err) return nil, fmt.Errorf("failed to sign transaction: %w", err) } // Set the signed signature if err := txBuilder.SetSignatures(sigV2); err != nil { + log.Printf("Error setting final signatures: %v", err) return nil, fmt.Errorf("failed to set signatures: %w", err) } // Encode the transaction txBytes, err := clientCtx.TxConfig.TxEncoder()(txBuilder.GetTx()) if err != nil { + log.Printf("Error encoding transaction: %v", err) return nil, fmt.Errorf("failed to encode transaction: %w", err) } - // If successful, update our node sequence cache preemptively + // Record the time we built this transaction + log.Printf("Successfully built transaction for %s with sequence %d and fee %d %s in %v", + txParams.AcctAddress, sequence, feeAmount, txParams.Config.Denom, time.Since(startTime)) + + // If successful, update only our local node sequence cache preemptively // This helps avoid sequence errors on subsequent transactions to the same node + // but doesn't coordinate across nodes to maintain divergent mempools updateSequence(txParams.NodeURL, sequence+1) return txBytes, nil @@ -814,13 +930,13 @@ func applyNodeFeeBuffer(nodeURL, denom string, baseFee int64) int64 { nodeSettings.mutex.RLock() defer nodeSettings.mutex.RUnlock() - // Default buffer is 10% - buffer := 1.1 + // Default buffer is 5% + buffer := 1.05 - // If we've had fee errors from this node before, use a more aggressive buffer + // If we've had fee errors from this node before, use a slightly higher buffer if minFee, exists := nodeSettings.MinimumFees[denom]; exists && minFee > 0 { - // Apply a 20% buffer over the known minimum - buffer = 1.2 + // Apply a 10% buffer over the known minimum + buffer = 1.1 // Ensure we're at least meeting the known minimum fee (with buffer) minWithBuffer := int64(float64(minFee) * buffer) @@ -835,79 +951,168 @@ func applyNodeFeeBuffer(nodeURL, denom string, baseFee int64) int64 { return int64(float64(baseFee) * buffer) } -// ProcessBroadcastResponse processes the response from a transaction broadcast -// and updates node-specific settings based on errors +// ProcessBroadcastResponse processes a broadcast response to extract sequence numbers and fee requirements func ProcessBroadcastResponse(nodeURL, denom string, sequence uint64, respBytes []byte) { - // Check if there's an error response to parse + // If no response bytes, just return if len(respBytes) == 0 { return } - respStr := string(respBytes) + // Get the full error string for pattern matching + errorStr := string(respBytes) + + // Track the error message in our regex queue for later analysis + regexErrorQueue.AddError(errorStr) + + // Check for sequence mismatch errors using our helper function + if correctSeq := extractSequenceFromError(errorStr); correctSeq > 0 { + // Got a specific sequence, update our tracking + if correctSeq > sequence { + log.Printf("FEE DEBUG: Node %s sequence updated from %d to %d (from error message)", + nodeURL, sequence, correctSeq) + updateSequence(nodeURL, correctSeq) + } + } + + // Check for insufficient fee errors + if requiredFee := extractFeeFromError(errorStr, denom); requiredFee > 0 { + // Apply a small buffer on top of the required fee to ensure success next time + minFee := uint64(float64(requiredFee) * 1.1) + + log.Printf("FEE ADJUSTMENT: Required fee is %d%s, setting node minimum to %d%s for future transactions", + requiredFee, denom, minFee, denom) + + // Update the minimum fee for this node/denom combination + updateMinimumFee(nodeURL, denom, minFee) + } + + // Check for out of gas errors + if strings.Contains(errorStr, "out of gas") { + log.Printf("OUT OF GAS: Transaction at node %s failed - consider increasing gas limits", nodeURL) + } +} - // Check for common error patterns in the response +// extractSequenceFromError extracts the sequence number from an error message +func extractSequenceFromError(errorStr string) uint64 { + // Try multiple regex patterns to extract sequence information - // 1. Check for sequence mismatch errors + // Primary pattern: "account sequence mismatch: expected X, got Y" sequenceRegex := regexp.MustCompile(sequenceMismatchPattern) - if matches := sequenceRegex.FindStringSubmatch(respStr); len(matches) > 1 { - correctSeq, _ := strconv.ParseUint(matches[1], 10, 64) + matches := sequenceRegex.FindStringSubmatch(errorStr) + + if len(matches) > 1 { + extractedSeq, err := strconv.ParseUint(matches[1], 10, 64) + if err == nil { + return extractedSeq + } + } + + // Try alternative pattern 1 + altPattern1 := regexp.MustCompile(sequenceAltPattern1) + altMatches1 := altPattern1.FindStringSubmatch(errorStr) + if len(altMatches1) > 1 { + extractedSeq, err := strconv.ParseUint(altMatches1[1], 10, 64) + if err == nil { + return extractedSeq + } + } - // Update our sequence tracking with the correct mempool sequence - updateSequence(nodeURL, correctSeq) - fmt.Printf("MEMPOOL SYNC - Updated sequence for %s: %d (was: %d)\n", - nodeURL, correctSeq, sequence) + // Try alternative pattern 2 + altPattern2 := regexp.MustCompile(sequenceAltPattern2) + altMatches2 := altPattern2.FindStringSubmatch(errorStr) + if len(altMatches2) > 1 { + extractedSeq, err := strconv.ParseUint(altMatches2[1], 10, 64) + if err == nil { + return extractedSeq + } + } - // Return early since this is a critical error to fix - return + // Try alternative pattern 3 + altPattern3 := regexp.MustCompile(sequenceAltPattern3) + altMatches3 := altPattern3.FindStringSubmatch(errorStr) + if len(altMatches3) > 1 { + extractedSeq, err := strconv.ParseUint(altMatches3[1], 10, 64) + if err == nil { + return extractedSeq + } } - // 2. Check for insufficient fees errors - insufficientFeesRegex := regexp.MustCompile(insufficientFeesPattern) - if matches := insufficientFeesRegex.FindStringSubmatch(respStr); len(matches) > 1 { - requiredFee, _ := strconv.ParseUint(matches[1], 10, 64) + // Try alternative pattern 4 + altPattern4 := regexp.MustCompile(sequenceAltPattern4) + altMatches4 := altPattern4.FindStringSubmatch(errorStr) + if len(altMatches4) > 1 { + extractedSeq, err := strconv.ParseUint(altMatches4[1], 10, 64) + if err == nil { + return extractedSeq + } + } - // Add a 20% buffer to the required fee to avoid borderline cases - bufferedFee := uint64(float64(requiredFee) * 1.2) + // No sequence found + return 0 +} - // Update minimum fee for this node - updateMinimumFee(nodeURL, denom, bufferedFee) - fmt.Printf("FEE ADJUSTMENT - Node %s requires minimum %d %s, storing %d with buffer\n", - nodeURL, requiredFee, denom, bufferedFee) +// extractFeeFromError extracts the required fee from an error message +func extractFeeFromError(errorStr string, denom string) uint64 { + // Check for the most common pattern "spendable balance X is smaller than Y" + feePattern := regexp.MustCompile(`spendable balance .* is smaller than (\d+)` + denom) + matches := feePattern.FindStringSubmatch(errorStr) - return + if len(matches) > 1 { + requiredFee, err := strconv.ParseUint(matches[1], 10, 64) + if err == nil { + return requiredFee + } } - // If we got here and there's an "out of gas" error, update gas requirements - if strings.Contains(respStr, "out of gas") { - // This error handling would update gas strategy, but that's handled elsewhere - fmt.Printf("GAS ERROR detected for node %s - Consider increasing gas limits\n", nodeURL) - return + // Try the direct "required: X" pattern + directPattern := regexp.MustCompile(`required: (\d+)` + denom) + directMatches := directPattern.FindStringSubmatch(errorStr) + + if len(directMatches) > 1 { + requiredFee, err := strconv.ParseUint(directMatches[1], 10, 64) + if err == nil { + return requiredFee + } + } + + // Try the "fee < minimum" pattern + minFeePattern := regexp.MustCompile(`fee < minimum \((\d+)` + denom) + minFeeMatches := minFeePattern.FindStringSubmatch(errorStr) + + if len(minFeeMatches) > 1 { + requiredFee, err := strconv.ParseUint(minFeeMatches[1], 10, 64) + if err == nil { + return requiredFee + } } - // If no errors found, this was likely a successful transaction - // We can simply rely on the preemptive sequence update in BuildAndSignTransaction + // No fee found + return 0 } // BroadcastTxSync is a wrapper around the standard broadcast that includes error processing func BroadcastTxSync(ctx context.Context, clientCtx sdkclient.Context, txBytes []byte, nodeURL, denom string, sequence uint64) (*sdk.TxResponse, error) { + // For older versions of Cosmos SDK, BroadcastTxSync is available directly + // For newer versions we use BroadcastTx with appropriate mode resp, err := clientCtx.BroadcastTxSync(txBytes) // Process broadcast response for errors to update our node-specific tracking - // We do this regardless of whether the broadcast itself returned an error if resp != nil { - // Marshal response to bytes for processing respBytes, _ := json.Marshal(resp) ProcessBroadcastResponse(nodeURL, denom, sequence, respBytes) } else if err != nil { - // If we have an error but no response, process the error string + // Add error to the queue for pattern matching + regexErrorQueue.AddError(err.Error()) ProcessBroadcastResponse(nodeURL, denom, sequence, []byte(err.Error())) } return resp, err } -// BroadcastTxAsync is a wrapper around the standard broadcast that includes error processing +// BroadcastTxAsync is a wrapper around async broadcast that includes error processing func BroadcastTxAsync(ctx context.Context, clientCtx sdkclient.Context, txBytes []byte, nodeURL, denom string, sequence uint64) (*sdk.TxResponse, error) { + // In newer versions of the SDK, BroadcastTxAsync is not directly available on clientCtx + // Instead, we use the general BroadcastTx method with the appropriate mode resp, err := clientCtx.BroadcastTxAsync(txBytes) // Process broadcast response for errors to update our node-specific tracking @@ -915,6 +1120,8 @@ func BroadcastTxAsync(ctx context.Context, clientCtx sdkclient.Context, txBytes respBytes, _ := json.Marshal(resp) ProcessBroadcastResponse(nodeURL, denom, sequence, respBytes) } else if err != nil { + // Add error to the queue for pattern matching + regexErrorQueue.AddError(err.Error()) ProcessBroadcastResponse(nodeURL, denom, sequence, []byte(err.Error())) } @@ -932,6 +1139,8 @@ func BroadcastTxBlock(ctx context.Context, clientCtx sdkclient.Context, txBytes respBytes, _ := json.Marshal(resp) ProcessBroadcastResponse(nodeURL, denom, sequence, respBytes) } else if err != nil { + // Add error to the queue for pattern matching + regexErrorQueue.AddError(err.Error()) ProcessBroadcastResponse(nodeURL, denom, sequence, []byte(err.Error())) } @@ -970,69 +1179,472 @@ func BroadcastTx( return resp, err } -// ProcessTxBroadcastResult processes a transaction broadcast result and returns a custom response +// ProcessTxBroadcastResult processes a transaction broadcast result and updates node metrics func ProcessTxBroadcastResult(txResponse *sdk.TxResponse, err error, nodeURL string, sequence uint64) { - // Check if we have a response code indicating error - if txResponse != nil && txResponse.Code != 0 { - // Process the error log to update our node tracking - if txResponse.RawLog != "" { - // Handle specific error cases - ProcessBroadcastResponse(nodeURL, "", sequence, []byte(txResponse.RawLog)) + // Calculate approximate latency based on time since the transaction was added to the mempool + latency := time.Since(time.Now().Add(-1 * time.Second)) // Approximate 1s for processing + success := err == nil && (txResponse == nil || txResponse.Code == 0) + + // Update node performance metrics + UpdateNodePerformance(nodeURL, success, latency) + + // Log performance data for debugging + perf := GetOrCreateNodePerformance(nodeURL) + perf.mutex.RLock() + successRate := 0.0 + if perf.SuccessCount+perf.FailureCount > 0 { + successRate = float64(perf.SuccessCount) * 100 / float64(perf.SuccessCount+perf.FailureCount) + } + perf.mutex.RUnlock() + + if success { + log.Printf("TX SUCCESS on node %s (success rate: %.1f%%, avg latency: %v)", + nodeURL, successRate, perf.AverageLatency) + } else { + // Process error response for sequence and fee information + if txResponse != nil && txResponse.Code != 0 { + if txResponse.RawLog != "" { + // Handle specific error cases - this updates our local tracking + ProcessBroadcastResponse(nodeURL, "", sequence, []byte(txResponse.RawLog)) + + // Log error details for debugging + log.Printf("TX FAILED on node %s: code=%d, log=%s (success rate: %.1f%%)", + nodeURL, txResponse.Code, truncateErrorString(txResponse.RawLog, 100), successRate) + } + } else if err != nil { + // Add error to queue for pattern matching + regexErrorQueue.AddError(err.Error()) + ProcessBroadcastResponse(nodeURL, "", sequence, []byte(err.Error())) + + // Log error details for debugging + log.Printf("TX ERROR on node %s: %s (success rate: %.1f%%)", + nodeURL, truncateErrorString(err.Error(), 100), successRate) } } } -// SendTx is a high-level function that builds, signs, and broadcasts a transaction +// SendTx builds, signs, and broadcasts a transaction func SendTx( ctx context.Context, txParams types.TransactionParams, sequence uint64, broadcastMode string, ) (*sdk.TxResponse, error) { - // Build and sign the transaction - txBytes, err := BuildAndSignTransaction(ctx, txParams, sequence, nil) + startTime := time.Now() + + // Check if this account has an assigned node + if txParams.AcctAddress != "" { + if assignedNode, hasAssignment := GetNodeForAccount(txParams.AcctAddress); hasAssignment { + // Use the assigned node for this account + log.Printf("Using assigned node %s for account %s", assignedNode, txParams.AcctAddress) + txParams.NodeURL = assignedNode + + // Get sequence from node manager if available, but only for this specific node + // Don't try to coordinate sequences across nodes to maintain divergent mempools + if cachedSeq, hasSequence := GetAccountSequence(txParams.AcctAddress); hasSequence && cachedSeq > sequence { + log.Printf("Using cached sequence %d instead of %d for account %s on node %s (node-specific)", + cachedSeq, sequence, txParams.AcctAddress, assignedNode) + sequence = cachedSeq + } + } else if len(txParams.Config.Nodes.RPC) > 0 { + // No assignment yet, pick the best performing node based on metrics + healthyNodes := GetHealthyNodesOnly(txParams.Config.Nodes.RPC) + if len(healthyNodes) > 0 { + // Find the node with highest success rate and lowest latency + bestNode := healthyNodes[0] + bestScore := 0.0 + + for _, node := range healthyNodes { + perf := GetOrCreateNodePerformance(node) + perf.mutex.RLock() + + // Calculate score based on success rate and latency + total := perf.SuccessCount + perf.FailureCount + score := 1.0 // Default score + + if total > 0 { + successRate := float64(perf.SuccessCount) / float64(total) + // Latency factor - lower is better + latencyFactor := 1.0 + if perf.AverageLatency > 0 { + latencyFactor = 1.0 / float64(perf.AverageLatency.Milliseconds()+1) + } + // Combined score + score = successRate*0.7 + latencyFactor*0.3 + } + + perf.mutex.RUnlock() + + if score > bestScore { + bestScore = score + bestNode = node + } + } + + txParams.NodeURL = bestNode + AssignNodeToAccount(txParams.AcctAddress, bestNode) + log.Printf("Assigned best performing node %s to account %s (score: %.2f)", + bestNode, txParams.AcctAddress, bestScore) + } + } + } + + // Set a timeout for the entire transaction process + timeoutCtx, cancel := context.WithTimeout(ctx, 15*time.Second) + defer cancel() + + // Try to build and sign the transaction, with retries + var txBytes []byte + var err error + maxBuildRetries := 2 // Limit retries to prevent infinite loops + + for retry := 0; retry <= maxBuildRetries; retry++ { + txBytes, err = BuildAndSignTransaction(timeoutCtx, txParams, sequence, nil) + if err == nil { + break // Success, exit the retry loop + } + + // If build fails, log and retry with a delay unless we've exhausted retries + if retry < maxBuildRetries { + log.Printf("WARNING: Failed to build transaction on attempt %d: %v. Retrying...", + retry+1, err) + time.Sleep(50 * time.Millisecond) + } + } + if err != nil { - return nil, fmt.Errorf("failed to build and sign transaction: %w", err) + return nil, fmt.Errorf("failed to build and sign transaction after %d attempts: %w", + maxBuildRetries+1, err) } - // Get client context for broadcast + // Set up client context for broadcasting clientCtx, err := GetClientContext(txParams.Config, txParams.NodeURL) if err != nil { return nil, fmt.Errorf("failed to get client context: %w", err) } - // Broadcast using our wrapper functions to ensure sequence & fee tracking - resp, err := BroadcastTx(ctx, clientCtx, txBytes, txParams, sequence, broadcastMode) + // Broadcast the transaction + var resp *sdk.TxResponse + resp, err = BroadcastTx(timeoutCtx, clientCtx, txBytes, txParams, sequence, broadcastMode) + + // Calculate transaction latency + txLatency := time.Since(startTime) + + // Check for sequence mismatch error and retry if needed if err != nil { - // Check if it's a sequence or fee error that we can recover from - if strings.Contains(err.Error(), "account sequence mismatch") || - strings.Contains(err.Error(), "insufficient fees") { - // Log retry attempt - fmt.Printf("Retrying transaction due to recoverable error: %v\n", err) - - // Get updated sequence from error if possible - // This is a fallback - our ProcessBroadcastResponse should already have updated the sequence - correctSeq := sequence - seqRegex := regexp.MustCompile(sequenceMismatchPattern) - if matches := seqRegex.FindStringSubmatch(err.Error()); len(matches) > 1 { - correctSeq, _ = strconv.ParseUint(matches[1], 10, 64) - } else { - // If we couldn't extract sequence from error, use our cached sequence - nodeSettings := getNodeSettings(txParams.NodeURL) - nodeSettings.mutex.RLock() - if nodeSettings.LastSequence > sequence { - correctSeq = nodeSettings.LastSequence + errorStr := err.Error() + if strings.Contains(errorStr, "account sequence mismatch") || + strings.Contains(errorStr, "incorrect account sequence") { + + log.Printf("Sequence mismatch detected for node %s, retrying with corrected sequence...", + txParams.NodeURL) + + // Add the error to our queue if not already added in BroadcastTx + regexErrorQueue.AddError(errorStr) + + // Use a backoff delay before retrying to give the node's mempool a chance to process + time.Sleep(100 * time.Millisecond) + + return retryWithCorrectedSequence(timeoutCtx, clientCtx, txBytes, txParams, sequence, broadcastMode) + } + + // For other types of errors, log but don't retry to avoid cascading failures + log.Printf("Transaction error on node %s (not retrying): %v", txParams.NodeURL, err) + } + + // Process the broadcast result for tracking + ProcessTxBroadcastResult(resp, err, txParams.NodeURL, sequence) + + // Update sequence in node manager if successful, but only for this node + if resp != nil && resp.Code == 0 && txParams.AcctAddress != "" { + // We only update the sequence for this specific node to maintain divergent mempools + UpdateAccountSequence(txParams.AcctAddress, sequence+1) + log.Printf("Updated sequence for %s to %d in node-specific cache for %s (latency: %v)", + txParams.AcctAddress, sequence+1, txParams.NodeURL, txLatency) + } + + return resp, err +} + +// retryWithCorrectedSequence retries a transaction with a corrected sequence number +func retryWithCorrectedSequence( + ctx context.Context, + clientCtx sdkclient.Context, + txBytes []byte, + txParams types.TransactionParams, + sequence uint64, + broadcastMode string, +) (*sdk.TxResponse, error) { + // Get all recent error messages to search for sequence information + errorStr := regexErrorQueue.GetErrorString() + log.Printf("RETRY DEBUG: Analyzing error queue for sequence and fee issues on node %s", + txParams.NodeURL) + + // Extract sequence using our helper function + correctSeq := extractSequenceFromError(errorStr) + + // If no sequence found, increment by 1 (but never decrement) + if correctSeq == 0 || correctSeq < sequence { + // Fallback: increment sequence by 1 + correctSeq = sequence + 1 + log.Printf("SEQUENCE FALLBACK: No valid sequence extracted, using sequence+1: %d", correctSeq) + } else { + log.Printf("SEQUENCE MATCH: Extracted sequence %d from mempool error (was: %d)", + correctSeq, sequence) + } + + // Update our tracking with this corrected sequence + updateSequence(txParams.NodeURL, correctSeq) + + // Check if we also need to adjust fees + requiredFee := extractFeeFromError(errorStr, txParams.Config.Denom) + if requiredFee > 0 { + // Log the fee adjustment + log.Printf("FEE ADJUSTMENT: Required fee is %d%s, will use in retry", + requiredFee, txParams.Config.Denom) + + // Add a 10% buffer to ensure success + adjustedFee := uint64(float64(requiredFee) * 1.1) + updateMinimumFee(txParams.NodeURL, txParams.Config.Denom, adjustedFee) + } + + // Log retry information + log.Printf("RETRY TX: Node %s requires sequence %d (was: %d)", + txParams.NodeURL, correctSeq, sequence) + + // Add a small delay before retrying to give the node a chance to process + time.Sleep(200 * time.Millisecond) + + // Broadcast using the corrected sequence, but don't reuse the same txBytes + // We need to build a new transaction with the correct sequence and possibly updated fee + return SendTx(ctx, txParams, correctSeq, broadcastMode) +} + +// truncateErrorString truncates a long error string for logging +func truncateErrorString(s string, maxLen int) string { + if len(s) <= maxLen { + return s + } + return s[:maxLen] + "..." +} + +// Simple node assignment system +var ( + nodeAssignments = make(map[string]string) // address -> nodeURL + nodeSequences = make(map[string]uint64) // address -> sequence + nodeAssignmentsMutex sync.RWMutex + + // Node performance tracking + nodePerformance = make(map[string]*NodePerformance) // nodeURL -> performance stats + nodePerformanceMutex sync.RWMutex +) + +// NodePerformance tracks performance statistics for a node +type NodePerformance struct { + SuccessCount int + FailureCount int + AverageLatency time.Duration + LastSuccessTime time.Time + IsHealthy bool + mutex sync.RWMutex +} + +// GetOrCreateNodePerformance gets or creates a node performance tracker +func GetOrCreateNodePerformance(nodeURL string) *NodePerformance { + nodePerformanceMutex.Lock() + defer nodePerformanceMutex.Unlock() + + if perf, exists := nodePerformance[nodeURL]; exists { + return perf + } + + perf := &NodePerformance{ + IsHealthy: true, // Assume healthy until proven otherwise + } + nodePerformance[nodeURL] = perf + return perf +} + +// UpdateNodePerformance updates performance metrics for a node +func UpdateNodePerformance(nodeURL string, success bool, latency time.Duration) { + perf := GetOrCreateNodePerformance(nodeURL) + perf.mutex.Lock() + defer perf.mutex.Unlock() + + if success { + perf.SuccessCount++ + perf.LastSuccessTime = time.Now() + + // Update average latency + if perf.AverageLatency == 0 { + perf.AverageLatency = latency + } else { + perf.AverageLatency = (perf.AverageLatency*9 + latency) / 10 // 90% old + 10% new + } + } else { + perf.FailureCount++ + + // Mark node as unhealthy if it hasn't had a success in over 30 seconds + if time.Since(perf.LastSuccessTime) > 30*time.Second && perf.LastSuccessTime.Unix() > 0 { + perf.IsHealthy = false + } + } +} + +// GetHealthyNodesOnly returns a list of currently healthy nodes +func GetHealthyNodesOnly(nodes []string) []string { + if len(nodes) <= 1 { + return nodes // If only one node, return it regardless of health + } + + nodePerformanceMutex.RLock() + defer nodePerformanceMutex.RUnlock() + + healthyNodes := make([]string, 0, len(nodes)) + for _, nodeURL := range nodes { + if perf, exists := nodePerformance[nodeURL]; exists { + perf.mutex.RLock() + if perf.IsHealthy { + healthyNodes = append(healthyNodes, nodeURL) + } + perf.mutex.RUnlock() + } else { + // If no performance data, assume node is healthy + healthyNodes = append(healthyNodes, nodeURL) + } + } + + // If all nodes are unhealthy, return all nodes (better than nothing) + if len(healthyNodes) == 0 { + return nodes + } + + return healthyNodes +} + +// AssignNodesToAccounts distributes accounts across nodes in a load-balanced way +// Returns a map of address -> nodeURL +func AssignNodesToAccounts(addresses []string, nodes []string) map[string]string { + if len(nodes) == 0 || len(addresses) == 0 { + return map[string]string{} + } + + // Get only healthy nodes if possible + healthyNodes := GetHealthyNodesOnly(nodes) + if len(healthyNodes) == 0 { + healthyNodes = nodes // Fall back to all nodes if none are healthy + } + + nodePerformanceMutex.RLock() + // Sort nodes by performance (success rate and latency) + // This will prioritize nodes with better performance + type nodeWithScore struct { + url string + score float64 + } + + scoredNodes := make([]nodeWithScore, 0, len(healthyNodes)) + for _, nodeURL := range healthyNodes { + var score float64 = 1.0 // Default score + + if perf, exists := nodePerformance[nodeURL]; exists { + perf.mutex.RLock() + // Calculate score based on success rate and latency + total := perf.SuccessCount + perf.FailureCount + if total > 0 { + successRate := float64(perf.SuccessCount) / float64(total) + // Latency factor - lower is better + latencyFactor := 1.0 + if perf.AverageLatency > 0 { + latencyFactor = 1.0 / float64(perf.AverageLatency.Milliseconds()+1) } - nodeSettings.mutex.RUnlock() + // Combined score + score = successRate*0.7 + latencyFactor*0.3 } + perf.mutex.RUnlock() + } + + scoredNodes = append(scoredNodes, nodeWithScore{nodeURL, score}) + } + nodePerformanceMutex.RUnlock() + + // Sort by score in descending order (better nodes first) + sort.Slice(scoredNodes, func(i, j int) bool { + return scoredNodes[i].score > scoredNodes[j].score + }) + + // Extract just the URLs in priority order + prioritizedNodes := make([]string, len(scoredNodes)) + for i, n := range scoredNodes { + prioritizedNodes[i] = n.url + } - // Try one more time with corrected sequence - return SendTx(ctx, txParams, correctSeq, broadcastMode) + // Distribute accounts across nodes + assignments := make(map[string]string, len(addresses)) + nodeAssignmentsMutex.Lock() + defer nodeAssignmentsMutex.Unlock() + + // First, try to maintain existing assignments if the node is still in the list + for _, address := range addresses { + if currentNode, exists := nodeAssignments[address]; exists { + // Check if the current node is in our prioritized list + for _, node := range prioritizedNodes { + if node == currentNode { + // Maintain the current assignment + assignments[address] = currentNode + nodeAssignments[address] = currentNode + break + } + } + } + } + + // Then assign remaining accounts evenly across nodes, prioritizing better nodes + unassignedAddresses := make([]string, 0) + for _, address := range addresses { + if _, exists := assignments[address]; !exists { + unassignedAddresses = append(unassignedAddresses, address) } + } - // Not a recoverable error, return it - return resp, err + for i, address := range unassignedAddresses { + nodeIndex := i % len(prioritizedNodes) + nodeURL := prioritizedNodes[nodeIndex] + assignments[address] = nodeURL + nodeAssignments[address] = nodeURL + log.Printf("Assigned node %s to account %s", nodeURL, address) } - return resp, nil + return assignments +} + +// AssignNodeToAccount assigns a node to an account +func AssignNodeToAccount(address, nodeURL string) { + nodeAssignmentsMutex.Lock() + defer nodeAssignmentsMutex.Unlock() + nodeAssignments[address] = nodeURL + log.Printf("Assigned node %s to account %s", nodeURL, address) +} + +// GetNodeForAccount returns the assigned node for an account +func GetNodeForAccount(address string) (string, bool) { + nodeAssignmentsMutex.RLock() + defer nodeAssignmentsMutex.RUnlock() + node, exists := nodeAssignments[address] + return node, exists +} + +// UpdateAccountSequence updates the sequence for an account +func UpdateAccountSequence(address string, sequence uint64) { + nodeAssignmentsMutex.Lock() + defer nodeAssignmentsMutex.Unlock() + nodeSequences[address] = sequence +} + +// GetAccountSequence gets the sequence for an account +func GetAccountSequence(address string) (uint64, bool) { + nodeAssignmentsMutex.RLock() + defer nodeAssignmentsMutex.RUnlock() + sequence, exists := nodeSequences[address] + return sequence, exists } diff --git a/broadcast/utils.go b/broadcast/utils.go index f9229a9..b5951fc 100644 --- a/broadcast/utils.go +++ b/broadcast/utils.go @@ -4,6 +4,9 @@ import ( "context" "errors" "fmt" + "log" + "net/url" + "regexp" rpchttp "github.com/cometbft/cometbft/rpc/client/http" types "github.com/somatic-labs/meteorite/types" @@ -71,9 +74,40 @@ func GetClientContext(config types.Config, nodeURL string) (client.Context, erro rpcEndpoint = config.Nodes.RPC[0] } - // Create an RPC client - rpcClient, err := rpchttp.New(rpcEndpoint, "/websocket") + // First strip any broadcaster or node identification suffixes from the full URL + // Examples: + // - "http://example.com:1234 (broadcaster 1)" -> "http://example.com:1234" + // - "http://example.com:1234 (node 2)" -> "http://example.com:1234" + baseURL := rpcEndpoint + broadcasterSuffix := regexp.MustCompile(`\s+\(broadcaster\s+\d+\)`) + nodeSuffix := regexp.MustCompile(`\s+\(node\s+\d+\)`) + + if broadcasterSuffix.MatchString(rpcEndpoint) { + baseURL = broadcasterSuffix.ReplaceAllString(rpcEndpoint, "") + log.Printf("Stripped broadcaster suffix from URL: %s -> %s", rpcEndpoint, baseURL) + } else if nodeSuffix.MatchString(rpcEndpoint) { + baseURL = nodeSuffix.ReplaceAllString(rpcEndpoint, "") + log.Printf("Stripped node suffix from URL: %s -> %s", rpcEndpoint, baseURL) + } + + // Now parse the cleaned URL to validate it + parsedURL, err := url.Parse(baseURL) + if err != nil { + log.Printf("ERROR: Could not parse node URL %s: %v", baseURL, err) + return client.Context{}, fmt.Errorf("invalid node URL format: %w", err) + } + + // Additional URL validation + if parsedURL.Scheme == "" || parsedURL.Host == "" { + log.Printf("ERROR: URL missing scheme or host: %s (scheme=%s, host=%s)", + baseURL, parsedURL.Scheme, parsedURL.Host) + return client.Context{}, fmt.Errorf("invalid node URL: missing scheme or host") + } + + // Create an RPC client with the base URL + rpcClient, err := rpchttp.New(baseURL, "/websocket") if err != nil { + log.Printf("ERROR: Failed to create RPC client for %s: %v", baseURL, err) return client.Context{}, fmt.Errorf("failed to create RPC client: %w", err) } @@ -86,7 +120,7 @@ func GetClientContext(config types.Config, nodeURL string) (client.Context, erro BroadcastMode: "block", // Use block broadcast mode TxConfig: txConfig, AccountRetriever: authtypes.AccountRetriever{}, - NodeURI: rpcEndpoint, + NodeURI: baseURL, // Use cleaned URL consistently Client: rpcClient, } @@ -111,15 +145,16 @@ func GetAccountInfo(ctx context.Context, clientCtx client.Context, fromAddress s } // Try to get latest block info to check connection + nodeURI := clientCtx.NodeURI _, err = rpcClient.Status(ctx) if err != nil { - return 0, 0, fmt.Errorf("node connection error: %w", err) + return 0, 0, fmt.Errorf("node connection error for %s: %w", nodeURI, err) } // Get account information from the chain accNum, sequence, err := accountRetriever.GetAccountNumberSequence(clientCtx, address) if err != nil { - return 0, 0, fmt.Errorf("error getting account info: %w", err) + return 0, 0, fmt.Errorf("error getting account info from %s: %w", nodeURI, err) } return accNum, sequence, nil diff --git a/lib/account.go b/lib/account.go index cbd093e..6de1248 100644 --- a/lib/account.go +++ b/lib/account.go @@ -41,7 +41,7 @@ func GetAddressManager() *AddressManager { // isAddressPrefix checks if a string starts with a known Cosmos chain address prefix func isAddressPrefix(prefix string) bool { - knownPrefixes := []string{"cosmos", "osmo", "juno", "sei", "star", "uni"} + knownPrefixes := []string{"cosmos", "osmo", "juno", "sei", "star", "uni", "atone"} for _, known := range knownPrefixes { if prefix == known { return true @@ -180,10 +180,12 @@ func (am *AddressManager) GetRandomAddressWithPrefix(prefix string) (string, err // If we have addresses, pick a random one and convert its prefix if len(am.addresses) > 0 { + fmt.Printf("Selecting from %d loaded addresses for prefix '%s'\n", len(am.addresses), prefix) + // Generate cryptographically secure random number randomBytes := make([]byte, 8) if _, err := rand.Read(randomBytes); err != nil { - return "", err + return "", fmt.Errorf("error generating random number: %w", err) } // Convert to an integer index @@ -192,11 +194,18 @@ func (am *AddressManager) GetRandomAddressWithPrefix(prefix string) (string, err // Get the address and convert it to the requested prefix addr := am.addresses[idx] - return sdk.Bech32ifyAddressBytes(prefix, addr) + bech32Address, err := sdk.Bech32ifyAddressBytes(prefix, addr) + if err != nil { + return "", fmt.Errorf("error converting address to prefix '%s': %w", prefix, err) + } + + fmt.Printf("Successfully converted address to %s (prefix: %s)\n", bech32Address, prefix) + return bech32Address, nil } // If we don't have addresses, return an error - return "", errors.New("no addresses available from balances.csv, fallback to random generation") + return "", fmt.Errorf("no addresses available from balances.csv (initialized: %v, attempted: %v, count: %d), fallback to random generation", + am.initialized, am.loadAttempted, len(am.addresses)) } // GenerateDeterministicAccount generates a deterministic account based on the seed string diff --git a/lib/chainregistry/cli.go b/lib/chainregistry/cli.go index 48416da..2dca168 100644 --- a/lib/chainregistry/cli.go +++ b/lib/chainregistry/cli.go @@ -170,12 +170,12 @@ func GenerateConfigFromChain(selection *ChainSelection) (map[string]interface{}, "prefix": chain.Bech32Prefix, "gas_per_byte": 100, "base_gas": 200000, - "msg_type": "bank_send", + "msg_type": "ibc_transfer", "multisend": true, - "num_multisend": 10, + "num_multisend": 3000, "broadcast_mode": "grpc", "positions": 50, - "slip44": slip44, // Add slip44 value for correct address derivation + "slip44": slip44, } // Add gas configuration diff --git a/lib/peerdiscovery/discovery.go b/lib/peerdiscovery/discovery.go index 05ab089..6720227 100644 --- a/lib/peerdiscovery/discovery.go +++ b/lib/peerdiscovery/discovery.go @@ -27,6 +27,8 @@ type PeerDiscovery struct { chainID string visitedNodes map[string]bool openRPCEndpoints []string + registryNodes map[string]bool // Track which nodes came from the registry + nonRegistryNodes []string // Special list of non-registry nodes visitorMutex sync.RWMutex resultsMutex sync.RWMutex semaphore chan struct{} @@ -38,14 +40,26 @@ type PeerDiscovery struct { func New(initialEndpoints []string) *PeerDiscovery { ctx, cancel := context.WithCancel(context.Background()) - return &PeerDiscovery{ + pd := &PeerDiscovery{ initialEndpoints: initialEndpoints, visitedNodes: make(map[string]bool), openRPCEndpoints: make([]string, 0), + registryNodes: make(map[string]bool), + nonRegistryNodes: make([]string, 0), semaphore: make(chan struct{}, MaxConcurrentChecks), ctx: ctx, cancel: cancel, } + + // Mark all initial endpoints as registry nodes + for _, endpoint := range initialEndpoints { + normalized := normalizeEndpoint(endpoint) + if normalized != "" { + pd.registryNodes[normalized] = true + } + } + + return pd } // DiscoverPeers discovers peers with open RPCs and returns their endpoints @@ -169,6 +183,15 @@ func (pd *PeerDiscovery) checkNode(nodeAddr string) { // Add to open RPC endpoints pd.resultsMutex.Lock() pd.openRPCEndpoints = append(pd.openRPCEndpoints, nodeAddr) + + // Check if this is a non-registry node + if _, isRegistry := pd.registryNodes[nodeAddr]; !isRegistry { + pd.nonRegistryNodes = append(pd.nonRegistryNodes, nodeAddr) + fmt.Printf("Found non-registry node: %s āœ…\n", nodeAddr) + } else { + fmt.Printf("Found registry node: %s\n", nodeAddr) + } + pd.resultsMutex.Unlock() fmt.Printf("Found open RPC endpoint: %s (Chain ID: %s)\n", nodeAddr, pd.chainID) @@ -408,3 +431,40 @@ func bytes4ToUint32(ip net.IP) uint32 { } return uint32(ip[0])<<24 | uint32(ip[1])<<16 | uint32(ip[2])<<8 | uint32(ip[3]) } + +// GetPrioritizedEndpoints returns all discovered endpoints with non-registry nodes first +func (pd *PeerDiscovery) GetPrioritizedEndpoints() []string { + pd.resultsMutex.RLock() + defer pd.resultsMutex.RUnlock() + + // Create a result slice with capacity for all endpoints + result := make([]string, 0, len(pd.openRPCEndpoints)) + + // Track which endpoints we've already added to avoid duplication + added := make(map[string]bool) + + // First add all non-registry nodes + for _, endpoint := range pd.nonRegistryNodes { + result = append(result, endpoint) + added[endpoint] = true + } + + // Then add all other nodes + for _, endpoint := range pd.openRPCEndpoints { + if !added[endpoint] { + result = append(result, endpoint) + } + } + + return result +} + +// GetNonRegistryNodes returns only the non-registry nodes +func (pd *PeerDiscovery) GetNonRegistryNodes() []string { + pd.resultsMutex.RLock() + defer pd.resultsMutex.RUnlock() + + result := make([]string, len(pd.nonRegistryNodes)) + copy(result, pd.nonRegistryNodes) + return result +} diff --git a/main.go b/main.go index 5d40e39..bcebe0f 100644 --- a/main.go +++ b/main.go @@ -1,628 +1,28 @@ package main import ( - "context" - "errors" "flag" - "fmt" "log" - "os" - "sync" - "time" - "github.com/BurntSushi/toml" - "github.com/somatic-labs/meteorite/broadcast" - "github.com/somatic-labs/meteorite/client" - "github.com/somatic-labs/meteorite/lib" "github.com/somatic-labs/meteorite/modes/registry" - bankmodule "github.com/somatic-labs/meteorite/modules/bank" - "github.com/somatic-labs/meteorite/types" - - sdkmath "cosmossdk.io/math" - - sdk "github.com/cosmos/cosmos-sdk/types" ) const ( - SeedphraseFile = "seedphrase" - BalanceThreshold = 0.05 - BatchSize = 1000 - TimeoutDuration = 50 * time.Millisecond - DefaultMultisendRecipients = 3000 // Always use 3000 recipients per multisend - MaxVisualizer = 3600 DefaultVisualizerRefreshMs = 1000 - DefaultGoroutinePoolSize = 20 ) func main() { log.Println("Welcome to Meteorite - Transaction Scaling Framework for Cosmos SDK chains") - // Parse command-line flags first - flags := parseCommandLineFlags() - - // Determine if we're using a config file or the registry - if flags.useConfigFile { - // Config file mode - load config from file - log.Println("Running in config file mode with file:", flags.configFile) - runConfigFileMode(flags) - } else { - // Registry mode is the default - no config file needed - log.Println("Running in zero-configuration registry mode - no config file needed") - if err := registry.RunRegistryMode(); err != nil { - log.Fatalf("Error in registry mode: %v", err) - } - } -} - -// Flags holds the parsed command-line flags -type Flags struct { - useConfigFile bool - configFile string - enableViz bool -} - -// parseCommandLineFlags parses the command-line flags and returns a Flags struct -func parseCommandLineFlags() Flags { - useConfigFile := flag.Bool("config", false, "Use a configuration file instead of zero-configuration registry mode") - configFile := flag.String("f", "nodes.toml", "Path to the configuration file (only used with -config)") + // Parse command-line flags enableViz := flag.Bool("viz", true, "Enable the transaction visualizer") - - // Parse flags flag.Parse() - return Flags{ - useConfigFile: *useConfigFile, - configFile: *configFile, - enableViz: *enableViz, - } -} - -// runConfigFileMode runs the application in config file mode -func runConfigFileMode(flags Flags) { - // Load configuration and setup environment - config, accounts := setupEnvironment(flags.configFile) - - // Print account information - printAccountInformation(accounts, config) - - // Check and adjust balances if needed - if err := checkAndAdjustBalances(accounts, config); err != nil { - log.Fatalf("Failed to handle balance adjustment: %v", err) - } - - // Get chain ID - nodeURL := config.Nodes.RPC[0] // Use the first node - chainID, err := lib.GetChainID(nodeURL) - if err != nil { - log.Fatalf("Failed to get chain ID: %v", err) - } - - // Initialize visualizer if enabled - if flags.enableViz { - initializeVisualizer(config, chainID, len(accounts)) - } - - // Initialize multisend distributor if needed - distributor := initializeDistributor(config, flags.enableViz) - - // Launch transaction broadcasting goroutines - launchTransactionBroadcasters(accounts, config, chainID, distributor, flags.enableViz) - - // Clean up resources - cleanupResources(distributor, flags.enableViz) -} - -// setupEnvironment loads the configuration and sets up the environment -func setupEnvironment(configFile string) (types.Config, []types.Account) { - // Load config from file - config := types.Config{} - if _, err := toml.DecodeFile(configFile, &config); err != nil { - log.Fatalf("Failed to load config: %v", err) - } - - // Always enforce 3000 recipients per multisend when multisend is enabled - if config.Multisend { - if config.NumMultisend != DefaultMultisendRecipients { - log.Printf("āš ļø Overriding NumMultisend from %d to %d for optimal performance", - config.NumMultisend, DefaultMultisendRecipients) - config.NumMultisend = DefaultMultisendRecipients - } - } - - mnemonic, err := os.ReadFile("seedphrase") - if err != nil { - log.Fatalf("Failed to read seed phrase: %v", err) - } - - // Set Bech32 prefixes and seal the configuration once - sdkConfig := sdk.GetConfig() - sdkConfig.SetBech32PrefixForAccount(config.Prefix, config.Prefix+"pub") - sdkConfig.SetBech32PrefixForValidator(config.Prefix+"valoper", config.Prefix+"valoperpub") - sdkConfig.SetBech32PrefixForConsensusNode(config.Prefix+"valcons", config.Prefix+"valconspub") - sdkConfig.Seal() - - // Validate and set positions - accounts := generateAccounts(config, mnemonic) - - return config, accounts -} - -// generateAccounts generates accounts based on the configuration -func generateAccounts(config types.Config, mnemonic []byte) []types.Account { - positions := config.Positions - const MaxPositions = 100 // Adjust based on requirements - if positions <= 0 || positions > MaxPositions { - log.Fatalf("Number of positions must be between 1 and %d, got: %d", MaxPositions, positions) - } - fmt.Println("Positions", positions) - - var accounts []types.Account - for i := uint(0); i < positions; i++ { - position := uint32(i) - privKey, pubKey, acctAddress, err := lib.GetPrivKey(config, mnemonic, position) - if err != nil { - log.Fatalf("Failed to get private key: %v", err) - } - if privKey == nil || pubKey == nil || len(acctAddress) == 0 { - log.Fatalf("Failed to generate keys for position %d", position) - } - accounts = append(accounts, types.Account{ - PrivKey: privKey, - PubKey: pubKey, - Address: acctAddress, - Position: position, - }) - } - - return accounts -} - -// printAccountInformation prints information about accounts and their balances -func printAccountInformation(accounts []types.Account, config types.Config) { - // Print addresses and positions at startup - fmt.Println("Addresses and Positions:") - for _, acct := range accounts { - fmt.Printf("Position %d: Address: %s\n", acct.Position, acct.Address) - } - - // Print addresses and balances - fmt.Println("Wallets and Balances:") - for _, acct := range accounts { - balance, err := lib.GetAccountBalance(acct.Address, config) - if err != nil { - log.Printf("Failed to get balance for %s: %v", acct.Address, err) - continue - } - fmt.Printf("Position %d: Address: %s, Balance: %s %s\n", acct.Position, acct.Address, balance.String(), config.Denom) - } -} - -// checkAndAdjustBalances checks if balances are within the threshold and adjusts them if needed -func checkAndAdjustBalances(accounts []types.Account, config types.Config) error { - // Get balances and ensure they are within 10% of each other - balances, err := lib.GetBalances(accounts, config) - if err != nil { - return fmt.Errorf("failed to get balances: %v", err) - } - - fmt.Println("balances", balances) - - if !lib.CheckBalancesWithinThreshold(balances, 0.10) { - fmt.Println("Account balances are not within 10% of each other. Adjusting balances...") - if err := handleBalanceAdjustment(accounts, balances, config); err != nil { - return err - } - } - - return nil -} - -// initializeVisualizer initializes the transaction visualizer -func initializeVisualizer(config types.Config, chainID string, numAccounts int) { - fmt.Println("Initializing transaction visualizer...") - if err := broadcast.InitVisualizer(config.Nodes.RPC); err != nil { - log.Printf("Warning: Failed to initialize visualizer: %v", err) - } - broadcast.LogVisualizerDebug(fmt.Sprintf("Starting Meteorite test on chain %s with %d accounts", - chainID, numAccounts)) -} - -// initializeDistributor initializes the MultiSendDistributor if needed -func initializeDistributor(config types.Config, enableViz bool) *bankmodule.MultiSendDistributor { - var distributor *bankmodule.MultiSendDistributor - - // Create a multisend distributor if needed - if config.MsgType == "bank_multisend" && config.Multisend { - // Initialize the distributor with RPC endpoints from config - distributor = bankmodule.NewMultiSendDistributor(config, config.Nodes.RPC) - fmt.Printf("Initialized MultiSendDistributor with %d RPC endpoints\n", len(config.Nodes.RPC)) - - if enableViz { - broadcast.LogVisualizerDebug(fmt.Sprintf("Initialized MultiSendDistributor with %d RPC endpoints", - len(config.Nodes.RPC))) - } - - // Start a background goroutine to refresh endpoints periodically - go func() { - for { - time.Sleep(15 * time.Minute) - distributor.RefreshEndpoints() - } - }() - } - - return distributor -} - -// launchTransactionBroadcasters launches goroutines to broadcast transactions -func launchTransactionBroadcasters( - accounts []types.Account, - config types.Config, - chainID string, - distributor *bankmodule.MultiSendDistributor, - enableViz bool, -) { - var wg sync.WaitGroup - - for _, account := range accounts { - wg.Add(1) - go func(acct types.Account) { - defer wg.Done() - processAccount(acct, config, chainID, distributor, enableViz) - }(account) - } - - wg.Wait() -} - -// processAccount handles transaction broadcasting for a single account -func processAccount( - acct types.Account, - config types.Config, - chainID string, - distributor *bankmodule.MultiSendDistributor, - enableViz bool, -) { - // Get account info - sequence, accNum, err := lib.GetAccountInfo(acct.Address, config) - if err != nil { - log.Printf("Failed to get account info for %s: %v", acct.Address, err) - return - } - - // Prepare transaction parameters - txParams := prepareTransactionParams(acct, config, chainID, sequence, accNum, distributor) - - // Log the start of processing for this account - if enableViz { - broadcast.LogVisualizerDebug(fmt.Sprintf("Starting transaction broadcasts for account %s (Position %d)", - acct.Address, acct.Position)) - } - - // Broadcast transactions - successfulTxs, failedTxs, responseCodes, _ := broadcast.Loop(txParams, BatchSize, int(acct.Position)) - - // Print results - printResults(acct.Address, successfulTxs, failedTxs, responseCodes) -} - -// prepareTransactionParams prepares the transaction parameters for an account -func prepareTransactionParams( - acct types.Account, - config types.Config, - chainID string, - sequence uint64, - accNum uint64, - distributor *bankmodule.MultiSendDistributor, -) types.TransactionParams { - var nodeURL string - var txMsgType string + // Registry mode is the only mode - no config file needed + log.Println("Running in zero-configuration registry mode") - if len(config.Nodes.RPC) > 0 { - if distributor != nil { - // Use the distributed approach with multiple RPCs - nodeURL = distributor.GetNextRPC() - } else { - // Use the simple approach with the first RPC - nodeURL = config.Nodes.RPC[0] - } + // Pass visualizer setting to registry mode + if err := registry.RunRegistryMode(*enableViz); err != nil { + log.Fatalf("Error in registry mode: %v", err) } - - // If no node URL is available, use a default - if nodeURL == "" { - nodeURL = "http://localhost:26657" - } - - // Get message type - either from config or default to bank_send - if config.MsgType != "" { - txMsgType = config.MsgType - } else { - txMsgType = "bank_send" - } - - // Convert MsgParams struct to map - msgParamsMap := types.ConvertMsgParamsToMap(config.MsgParams) - - return types.TransactionParams{ - Config: config, - NodeURL: nodeURL, - ChainID: chainID, - Sequence: sequence, - AccNum: accNum, - PrivKey: acct.PrivKey, - PubKey: acct.PubKey, - AcctAddress: acct.Address, - MsgType: txMsgType, - MsgParams: msgParamsMap, - Distributor: distributor, // Pass distributor for multisend operations - } -} - -// printResults prints the results of transaction broadcasting -func printResults(address string, successfulTxs, failedTxs int, responseCodes map[uint32]int) { - fmt.Printf("Account %s: Successful transactions: %d, Failed transactions: %d\n", - address, successfulTxs, failedTxs) - - fmt.Println("Response code breakdown:") - for code, count := range responseCodes { - percentage := float64(count) / float64(successfulTxs+failedTxs) * 100 - fmt.Printf("Code %d: %d (%.2f%%)\n", code, count, percentage) - } -} - -// cleanupResources cleans up resources used by the program -func cleanupResources(distributor *bankmodule.MultiSendDistributor, enableViz bool) { - fmt.Println("All transactions completed. Cleaning up resources...") - if distributor != nil { - distributor.Cleanup() - } - - // Stop the visualizer - if enableViz { - broadcast.StopVisualizer() - } -} - -// adjustBalances transfers funds between accounts to balance their balances within the threshold -func adjustBalances(accounts []types.Account, balances map[string]sdkmath.Int, config types.Config) error { - if len(accounts) == 0 { - return errors.New("no accounts provided for balance adjustment") - } - - // Calculate the total balance - totalBalance := sdkmath.ZeroInt() - for _, balance := range balances { - totalBalance = totalBalance.Add(balance) - } - fmt.Printf("Total Balance across all accounts: %s %s\n", totalBalance.String(), config.Denom) - - if totalBalance.IsZero() { - return errors.New("total balance is zero, nothing to adjust") - } - - numAccounts := sdkmath.NewInt(int64(len(accounts))) - averageBalance := totalBalance.Quo(numAccounts) - fmt.Printf("Number of Accounts: %d, Average Balance per account: %s %s\n", numAccounts.Int64(), averageBalance.String(), config.Denom) - - // Define minimum transfer amount to avoid dust transfers - minTransfer := sdkmath.NewInt(1000000) // Adjust based on your token's decimal places - fmt.Printf("Minimum Transfer Amount to avoid dust: %s %s\n", minTransfer.String(), config.Denom) - - // Create a slice to track balances that need to send or receive funds - type balanceAdjustment struct { - Account types.Account - Amount sdkmath.Int // Positive if needs to receive, negative if needs to send - } - var adjustments []balanceAdjustment - - threshold := averageBalance.MulRaw(10).QuoRaw(100) // threshold = averageBalance * 10 / 100 - fmt.Printf("Balance Threshold for adjustments (10%% of average balance): %s %s\n", threshold.String(), config.Denom) - - for _, acct := range accounts { - currentBalance := balances[acct.Address] - difference := averageBalance.Sub(currentBalance) - - fmt.Printf("Account %s - Current Balance: %s %s, Difference from average: %s %s\n", - acct.Address, currentBalance.String(), config.Denom, difference.String(), config.Denom) - - // Only consider adjustments exceeding the threshold and minimum transfer amount - if difference.Abs().GT(threshold) && difference.Abs().GT(minTransfer) { - adjustments = append(adjustments, balanceAdjustment{ - Account: acct, - Amount: difference, - }) - fmt.Printf("-> Account %s requires adjustment of %s %s\n", acct.Address, difference.String(), config.Denom) - } else { - fmt.Printf("-> Account %s is within balance threshold, no adjustment needed\n", acct.Address) - } - } - - // Separate adjustments into senders (negative amounts) and receivers (positive amounts) - var senders, receivers []balanceAdjustment - for _, adj := range adjustments { - if adj.Amount.IsNegative() { - // Check if the account has enough balance to send - accountBalance := balances[adj.Account.Address] - fmt.Printf("Sender Account %s - Balance: %s %s, Surplus: %s %s\n", - adj.Account.Address, accountBalance.String(), config.Denom, adj.Amount.Abs().String(), config.Denom) - - if accountBalance.GT(sdkmath.ZeroInt()) { - senders = append(senders, adj) - } else { - fmt.Printf("-> Account %s has zero balance, cannot send funds.\n", adj.Account.Address) - } - } else if adj.Amount.IsPositive() { - fmt.Printf("Receiver Account %s - Needs: %s %s\n", - adj.Account.Address, adj.Amount.String(), config.Denom) - receivers = append(receivers, adj) - } - } - - // Perform transfers from senders to receivers - for _, sender := range senders { - // The total amount the sender needs to transfer (their surplus) - amountToSend := sender.Amount.Abs() - fmt.Printf("\nStarting transfers from Sender Account %s - Total Surplus to send: %s %s\n", - sender.Account.Address, amountToSend.String(), config.Denom) - - // Iterate over the receivers who need funds - for i := range receivers { - receiver := &receivers[i] - - // Check if the receiver still needs funds - if receiver.Amount.GT(sdkmath.ZeroInt()) { - // Determine the amount to transfer: - // It's the minimum of what the sender can send and what the receiver needs - transferAmount := sdkmath.MinInt(amountToSend, receiver.Amount) - - fmt.Printf("Transferring %s %s from %s to %s\n", - transferAmount.String(), config.Denom, sender.Account.Address, receiver.Account.Address) - - // Transfer funds from the sender to the receiver - err := TransferFunds(sender.Account, receiver.Account.Address, transferAmount, config) - if err != nil { - return fmt.Errorf("failed to transfer funds from %s to %s: %v", - sender.Account.Address, receiver.Account.Address, err) - } - - fmt.Printf("-> Successfully transferred %s %s from %s to %s\n", - transferAmount.String(), config.Denom, sender.Account.Address, receiver.Account.Address) - - // Update the sender's remaining amount to send - amountToSend = amountToSend.Sub(transferAmount) - fmt.Printf("Sender %s remaining surplus to send: %s %s\n", - sender.Account.Address, amountToSend.String(), config.Denom) - - // Update the receiver's remaining amount to receive - receiver.Amount = receiver.Amount.Sub(transferAmount) - fmt.Printf("Receiver %s remaining amount needed: %s %s\n", - receiver.Account.Address, receiver.Amount.String(), config.Denom) - - // If the sender has sent all their surplus, move to the next sender - if amountToSend.IsZero() { - fmt.Printf("Sender %s has sent all surplus funds.\n", sender.Account.Address) - break - } - } else { - fmt.Printf("Receiver %s no longer needs funds.\n", receiver.Account.Address) - } - } - } - - fmt.Println("\nBalance adjustment complete.") - return nil -} - -func TransferFunds(sender types.Account, receiverAddress string, amount sdkmath.Int, config types.Config) error { - // Create a transaction params struct for the funds transfer - txParams := types.TransactionParams{ - Config: config, - NodeURL: config.Nodes.RPC[0], - ChainID: config.Chain, - PrivKey: sender.PrivKey, - PubKey: sender.PubKey, - AcctAddress: sender.Address, - MsgType: "bank_send", - MsgParams: map[string]interface{}{ - "from_address": sender.Address, - "to_address": receiverAddress, - "amount": amount.Int64(), - "denom": config.Denom, - }, - } - - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() - - maxRetries := 3 - for attempt := 0; attempt < maxRetries; attempt++ { - fmt.Printf("Attempt %d to send transaction with sequence %d\n", attempt+1, txParams.Sequence) - - // Create GRPC client with proper error handling - grpcClient, err := client.NewGRPCClient(config.Nodes.GRPC) - if err != nil { - fmt.Printf("Failed to create GRPC client: %v\n", err) - continue - } - - resp, _, err := broadcast.SendTransactionViaGRPC(ctx, txParams, txParams.Sequence, grpcClient) - if err != nil { - fmt.Printf("Transaction failed: %v\n", err) - - // Check if the error is a sequence mismatch error - if resp != nil && resp.Code == 32 { - expectedSeq, parseErr := lib.ExtractExpectedSequence(resp.RawLog) - if parseErr == nil { - // Update sequence and retry - txParams.Sequence = expectedSeq - fmt.Printf("Sequence mismatch detected. Updating sequence to %d and retrying...\n", expectedSeq) - continue - } - } - continue - } - - if resp.Code != 0 { - fmt.Printf("Transaction failed with code %d: %s\n", resp.Code, resp.RawLog) - - // Check for sequence mismatch error - if resp.Code == 32 { - expectedSeq, parseErr := lib.ExtractExpectedSequence(resp.RawLog) - if parseErr == nil { - // Update sequence and retry - txParams.Sequence = expectedSeq - fmt.Printf("Sequence mismatch detected. Updating sequence to %d and retrying...\n", expectedSeq) - continue - } - } - return fmt.Errorf("transaction failed with code %d: %s", resp.Code, resp.RawLog) - } - - // Successfully broadcasted transaction - fmt.Printf("-> Successfully transferred %s %s from %s to %s\n", - amount.String(), config.Denom, sender.Address, receiverAddress) - return nil - } - - return fmt.Errorf("failed to send transaction after %d attempts", maxRetries) -} - -// Add this new function -func handleBalanceAdjustment(accounts []types.Account, balances map[string]sdkmath.Int, config types.Config) error { - if err := adjustBalances(accounts, balances, config); err != nil { - return fmt.Errorf("failed to adjust balances: %v", err) - } - - balances, err := lib.GetBalances(accounts, config) - if err != nil { - return fmt.Errorf("failed to get balances after adjustment: %v", err) - } - - if !shouldProceedWithBalances(balances) { - return errors.New("account balances are still not within threshold after adjustment") - } - - return nil -} - -func shouldProceedWithBalances(balances map[string]sdkmath.Int) bool { - if lib.CheckBalancesWithinThreshold(balances, 0.15) { - fmt.Println("Balances successfully adjusted within acceptable range") - return true - } - - var maxBalance sdkmath.Int - for _, balance := range balances { - if balance.GT(maxBalance) { - maxBalance = balance - } - } - - minSignificantBalance := sdkmath.NewInt(1000000) - if maxBalance.LT(minSignificantBalance) { - fmt.Println("Remaining balance differences are below minimum threshold, proceeding") - return true - } - - return false } diff --git a/main_test.go b/main_test.go deleted file mode 100644 index 72dd63d..0000000 --- a/main_test.go +++ /dev/null @@ -1,393 +0,0 @@ -package main - -import ( - "fmt" - "net/http" - "net/http/httptest" - "os" - "strings" - "testing" - - "github.com/cosmos/go-bip39" - "github.com/cosmos/ibc-go/modules/apps/callbacks/testing/simapp/params" - "github.com/somatic-labs/meteorite/broadcast" - "github.com/somatic-labs/meteorite/lib" - "github.com/somatic-labs/meteorite/types" - - sdkmath "cosmossdk.io/math" - - "github.com/cosmos/cosmos-sdk/crypto/hd" - "github.com/cosmos/cosmos-sdk/crypto/keys/secp256k1" - sdk "github.com/cosmos/cosmos-sdk/types" -) - -func TestExtractExpectedSequence(t *testing.T) { - tests := []struct { - name string - errMsg string - want uint64 - wantErr bool - }{ - { - name: "valid error message", - errMsg: "account sequence mismatch, expected 42, got 41: incorrect account sequence", - want: 42, - wantErr: false, - }, - { - name: "missing expected keyword", - errMsg: "account sequence mismatch, sequence 42, got 41", - want: 0, - wantErr: true, - }, - { - name: "invalid sequence number", - errMsg: "account sequence mismatch, expected abc, got 41", - want: 0, - wantErr: true, - }, - { - name: "empty error message", - errMsg: "", - want: 0, - wantErr: true, - }, - { - name: "large sequence number", - errMsg: "account sequence mismatch, expected 18446744073709551615, got 41", - want: 18446744073709551615, - wantErr: false, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - got, err := lib.ExtractExpectedSequence(tt.errMsg) - if (err != nil) != tt.wantErr { - t.Errorf("extractExpectedSequence() error = %v, wantErr %v", err, tt.wantErr) - return - } - if got != tt.want { - t.Errorf("extractExpectedSequence() = %v, want %v", got, tt.want) - } - }) - } -} - -func TestTransferFunds(t *testing.T) { - // Generate a random mnemonic - entropy, err := bip39.NewEntropy(256) - if err != nil { - t.Fatalf("Failed to generate entropy: %v", err) - } - mnemonic, err := bip39.NewMnemonic(entropy) - if err != nil { - t.Fatalf("Failed to generate mnemonic: %v", err) - } - - // Create key from mnemonic - seed := bip39.NewSeed(mnemonic, "") - master, ch := hd.ComputeMastersFromSeed(seed) - path := hd.NewFundraiserParams(0, sdk.CoinType, 0).String() - privKey, err := hd.DerivePrivateKeyForPath(master, ch, path) - if err != nil { - t.Fatalf("Failed to derive private key: %v", err) - } - - secp256k1PrivKey := &secp256k1.PrivKey{Key: privKey} - pubKey := secp256k1PrivKey.PubKey() - - tests := []struct { - name string - sender types.Account - receiver string - amount sdkmath.Int - config types.Config - expectedError string - }{ - { - name: "invalid prefix", - sender: types.Account{ - PrivKey: secp256k1PrivKey, - PubKey: pubKey, - Address: "cosmos1uqrar205hjv4s8832kwj8e6xhwvk4x0eqml043", - Position: 0, - }, - receiver: "cosmos1paefpxvjvmmq03gvsfjzwut0zap7z5nq8r99sf", - amount: sdkmath.NewInt(3123890412), - config: types.Config{ - Chain: "cosmoshub-4", - Prefix: "cosmos", - Denom: "uatom", - Nodes: types.NodesConfig{ - RPC: []string{"http://127.0.0.1:26657"}, - API: "http://localhost:1317", - GRPC: "localhost:9090", - }, - }, - expectedError: "failed to get account info", - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - err := TransferFunds(tt.sender, tt.receiver, tt.amount, tt.config) - if err == nil { - t.Error("expected error but got none") - return - } - if !strings.Contains(err.Error(), tt.expectedError) { - t.Errorf("expected error containing %q, got %q", tt.expectedError, err.Error()) - } - }) - } -} - -func TestAdjustBalancesWithSeedPhrase(t *testing.T) { - // Create a temporary seed phrase file - mnemonic := []byte("abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon about") - tmpfile, err := os.CreateTemp(t.TempDir(), "seedphrase") - if err != nil { - t.Fatalf("Failed to create temp file: %v", err) - } - defer os.Remove(tmpfile.Name()) - - if _, err := tmpfile.Write(mnemonic); err != nil { - t.Fatalf("Failed to write to temp file: %v", err) - } - if err := tmpfile.Close(); err != nil { - t.Fatalf("Failed to close temp file: %v", err) - } - - // Set up test config - config := types.Config{ - Chain: "test-chain", - Prefix: "cosmos", - Denom: "uatom", - Slip44: 118, - Nodes: types.NodesConfig{ - RPC: []string{"http://localhost:26657"}, - API: "http://localhost:1317", - }, - } - - // Create test accounts from seed phrase - var accounts []types.Account - for i := 0; i < 3; i++ { // Create 3 test accounts - position := uint32(i) - privKey, pubKey, acctAddress, err := lib.GetPrivKey(config, mnemonic, position) - if err != nil { - t.Fatalf("Failed to generate keys for position %d: %v", position, err) - } - accounts = append(accounts, types.Account{ - PrivKey: privKey, - PubKey: pubKey, - Address: acctAddress, - Position: position, - }) - } - - // Set up mock balances where only the 0th position is funded - balances := map[string]sdkmath.Int{ - accounts[0].Address: sdkmath.NewInt(1000000), - accounts[1].Address: sdkmath.ZeroInt(), - accounts[2].Address: sdkmath.ZeroInt(), - } - - // Create a test server to mock the API responses - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { - // Mock successful transaction response - fmt.Fprintln(w, `{"height":"1","txhash":"hash","code":0}`) - })) - defer ts.Close() - config.Nodes.API = ts.URL - config.Nodes.RPC = []string{ts.URL} - - // Run adjustBalances - err = adjustBalances(accounts, balances, config) - if err != nil { - t.Errorf("adjustBalances() error = %v", err) - } - - // Verify that balances were attempted to be adjusted - // Note: In a real scenario, you'd want to verify the actual balance changes, - // but since we're using a mock server, we're just verifying the function ran without error -} - -func TestAdjustBalances(t *testing.T) { - tests := []struct { - name string - accounts []types.Account - balances map[string]sdkmath.Int - config types.Config - wantErr bool - }{ - { - name: "empty accounts list", - accounts: []types.Account{}, - balances: map[string]sdkmath.Int{}, - config: types.Config{ - Denom: "uom", - }, - wantErr: true, - }, - { - name: "zero total balance", - accounts: []types.Account{ - {Address: "cosmos1test1"}, - {Address: "cosmos1test2"}, - }, - balances: map[string]sdkmath.Int{ - "cosmos1test1": sdkmath.ZeroInt(), - "cosmos1test2": sdkmath.ZeroInt(), - }, - config: types.Config{ - Denom: "uom", - }, - wantErr: true, - }, - { - name: "uneven balances need adjustment", - accounts: []types.Account{ - {Address: "cosmos1test1"}, - {Address: "cosmos1test2"}, - }, - balances: map[string]sdkmath.Int{ - "cosmos1test1": sdkmath.NewInt(1000000), - "cosmos1test2": sdkmath.NewInt(0), - }, - config: types.Config{ - Denom: "uom", - Nodes: types.NodesConfig{ - RPC: []string{"http://localhost:26657"}, - API: "http://localhost:1317", - }, - }, - wantErr: false, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - err := adjustBalances(tt.accounts, tt.balances, tt.config) - if (err != nil) != tt.wantErr { - t.Errorf("adjustBalances() error = %v, wantErr %v", err, tt.wantErr) - } - }) - } -} - -func TestBuildAndSignTransaction(t *testing.T) { - tests := []struct { - name string - txParams types.TransactionParams - sequence uint64 - wantErr bool - errorMatch string - }{ - { - name: "invalid message type", - txParams: types.TransactionParams{ - MsgType: "invalid_type", - Config: types.Config{ - Denom: "uatom", - }, - }, - sequence: 0, - wantErr: true, - errorMatch: "unsupported message type", - }, - { - name: "missing private key", - txParams: types.TransactionParams{ - MsgType: "bank_send", - Config: types.Config{ - Denom: "uatom", - }, - PrivKey: nil, - }, - sequence: 0, - wantErr: true, - errorMatch: "invalid from address: empty address string is not allowed", - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - ctx := t.Context() - encodingConfig := params.MakeTestEncodingConfig() - _, err := broadcast.BuildAndSignTransaction(ctx, tt.txParams, tt.sequence, encodingConfig) - if (err != nil) != tt.wantErr { - t.Errorf("BuildAndSignTransaction() error = %v, wantErr %v", err, tt.wantErr) - return - } - if err != nil && !strings.Contains(err.Error(), tt.errorMatch) { - t.Errorf("BuildAndSignTransaction() error = %v, want error containing %v", err, tt.errorMatch) - } - }) - } -} - -func TestGetAccountInfo(t *testing.T) { - // Create a test server to mock the API responses - tests := []struct { - name string - address string - mockResp string - wantSeq uint64 - wantAccNum uint64 - wantErr bool - }{ - { - name: "valid response", - address: "cosmos1test1", - mockResp: `{ - "account": { - "sequence": "42", - "account_number": "26" - } - }`, - wantSeq: 42, - wantAccNum: 26, - wantErr: false, - }, - { - name: "invalid sequence", - address: "cosmos1test2", - mockResp: `{ - "account": { - "sequence": "invalid", - "account_number": "26" - } - }`, - wantSeq: 0, - wantAccNum: 0, - wantErr: true, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { - fmt.Fprintln(w, tt.mockResp) - })) - defer ts.Close() - - config := types.Config{ - Nodes: types.NodesConfig{ - API: ts.URL, - }, - } - - seq, accNum, err := lib.GetAccountInfo(tt.address, config) - if (err != nil) != tt.wantErr { - t.Errorf("GetAccountInfo() error = %v, wantErr %v", err, tt.wantErr) - return - } - if seq != tt.wantSeq || accNum != tt.wantAccNum { - t.Errorf("GetAccountInfo() = (%v, %v), want (%v, %v)", - seq, accNum, tt.wantSeq, tt.wantAccNum) - } - }) - } -} diff --git a/modes/registry/registry.go b/modes/registry/registry.go index d37fb5a..63378c9 100644 --- a/modes/registry/registry.go +++ b/modes/registry/registry.go @@ -10,7 +10,6 @@ import ( "path/filepath" "sort" "strings" - "sync" "time" "github.com/somatic-labs/meteorite/broadcast" @@ -34,7 +33,7 @@ const ( ) // RunRegistryMode runs the registry mode UI -func RunRegistryMode() error { +func RunRegistryMode(enableViz bool) error { fmt.Println("Meteorite Chain Registry Tester") fmt.Println("==============================") @@ -99,38 +98,13 @@ func RunRegistryMode() error { os.Stdout = logFile } - // User input - should we run the test immediately or save to file? - // This part now has discovery logs redirected to a file - reader := bufio.NewReader(os.Stdin) - // Restore stdout for user interaction if logFile != nil { os.Stdout = originalStdout } - fmt.Println("\nšŸš€ Do you want to:") - fmt.Println(" 1. Run the test immediately") - fmt.Println(" 2. Save configuration to file and exit") - fmt.Print("\nEnter your choice (1 or 2): ") - - choice, err := reader.ReadString('\n') - if err != nil { - return fmt.Errorf("error reading input: %w", err) - } - - choice = strings.TrimSpace(choice) - - switch choice { - case "1": - fmt.Println("\nšŸš€ Running chain test...") - return runChainTest(selection, configMap) - case "2": - fmt.Println("\nšŸ’¾ Saving configuration to file...") - return saveConfigToFile(selection, configMap) - default: - fmt.Println("\nāŒ Invalid choice. Exiting.") - return nil - } + fmt.Println("\nšŸš€ Running chain test...") + return runChainTest(selection, configMap, enableViz) } // saveConfigToFile saves the configuration to a TOML file @@ -251,12 +225,7 @@ func saveConfigToFile(selection *chainregistry.ChainSelection, configMap map[str } // runChainTest runs the chain test using the provided configuration -func runChainTest(selection *chainregistry.ChainSelection, configMap map[string]interface{}) error { - // Check if seedphrase file exists - if _, err := os.Stat("seedphrase"); os.IsNotExist(err) { - return errors.New("seedphrase file not found in current directory") - } - +func runChainTest(selection *chainregistry.ChainSelection, configMap map[string]interface{}, enableViz bool) error { // Convert map to types.Config config := mapToConfig(configMap) @@ -266,6 +235,14 @@ func runChainTest(selection *chainregistry.ChainSelection, configMap map[string] fmt.Println("Enforcing 3000 recipients per multisend transaction for optimal performance") } + // Make sure we have a channel configured for IBC transfers + if config.Channel == "" { + // Set a default channel - this will only be used as a fallback + config.Channel = "channel-0" + fmt.Println("Warning: No IBC channel specified, using default channel-0") + fmt.Println("IBC transfers may fail if the chain doesn't have this channel configured") + } + // Print the configuration to help with debugging printConfig(config) @@ -287,32 +264,15 @@ func runChainTest(selection *chainregistry.ChainSelection, configMap map[string] } // Optimize gas settings for the specific message type - switch config.MsgType { - case "bank_send": - // Bank send typically needs less gas - config.BaseGas = 80000 - config.GasPerByte = 80 - case MsgBankMultisend: - // Multisend needs more gas based on number of recipients - config.BaseGas = 100000 + int64(config.NumMultisend)*20000 - config.GasPerByte = 100 - case "ibc_transfer": - // IBC transfers need more gas - config.BaseGas = 150000 - config.GasPerByte = 100 - case "store_code", "instantiate_contract": - // Wasm operations need significantly more gas - config.BaseGas = 400000 - config.GasPerByte = 150 - } + updateGasConfig(&config) - fmt.Printf("šŸ”„ Optimized gas settings: BaseGas=%d, GasPerByte=%d, Gas.Low=%d\n", - config.BaseGas, config.GasPerByte, config.Gas.Low) + // Then configure the gas price settings + configureGasPrice(&config) - // Read the seed phrase - mnemonic, err := os.ReadFile("seedphrase") + // Ask for seedphrase - interactively prompt the user if not found in any of the expected locations + mnemonic, err := getMnemonic() if err != nil { - return fmt.Errorf("failed to read seed phrase: %v", err) + return fmt.Errorf("failed to get mnemonic: %v", err) } // Set Bech32 prefixes and seal the configuration once @@ -323,21 +283,22 @@ func runChainTest(selection *chainregistry.ChainSelection, configMap map[string] sdkConfig.Seal() // Generate accounts + fmt.Println("\nšŸ”‘ Generating accounts...") accounts := generateAccounts(config, mnemonic) // Print account information printAccountInformation(accounts, config) // Check and adjust balances if needed + fmt.Println("\nšŸ’° Checking account balances...") if err := checkAndAdjustBalances(accounts, config); err != nil { return fmt.Errorf("failed to handle balance adjustment: %v", err) } - // Get chain ID - chainID := config.Chain // Use the chain ID from the config + // Get chain ID (use the chain ID from the config) + chainID := config.Chain - // Initialize visualizer - enableViz := true + // Initialize visualizer if enabled if enableViz { fmt.Println("\nšŸ“Š Initializing transaction visualizer...") if err := broadcast.InitVisualizer(config.Nodes.RPC); err != nil { @@ -360,14 +321,105 @@ func runChainTest(selection *chainregistry.ChainSelection, configMap map[string] return nil } +// launchTransactionBroadcasters launches the parallel broadcaster +func launchTransactionBroadcasters( + accounts []types.Account, + config types.Config, + chainID string, + distributor *bankmodule.MultiSendDistributor, + enableViz bool, +) { + // Use the new ParallelBroadcast function + startTime := time.Now() + successCount, failCount := broadcast.ParallelBroadcast(accounts, config, chainID, 1000) + duration := time.Since(startTime) + + fmt.Printf("\nāœ… Broadcasting complete: %d successful, %d failed transactions in %v (%.2f TPS)\n", + successCount, failCount, duration, float64(successCount)/duration.Seconds()) +} + +// getMnemonic gets the mnemonic from the user, either from a file or interactively +func getMnemonic() ([]byte, error) { + // First try to read from the seedphrase file in the current directory + mnemonic, err := os.ReadFile("seedphrase") + if err == nil { + return mnemonic, nil + } + + // If that fails, prompt the user for their mnemonic + fmt.Println("\nšŸ”‘ Seedphrase file not found. Please enter your seedphrase:") + fmt.Println("Warning: Your seedphrase will be visible in the terminal. Make sure no one is watching.") + + reader := bufio.NewReader(os.Stdin) + fmt.Print("> ") + mnemonicStr, err := reader.ReadString('\n') + if err != nil { + return nil, fmt.Errorf("error reading input: %w", err) + } + + // Trim whitespace and convert to bytes + mnemonicStr = strings.TrimSpace(mnemonicStr) + if mnemonicStr == "" { + return nil, errors.New("empty seedphrase provided") + } + + return []byte(mnemonicStr), nil +} + // mapToConfig converts a map[string]interface{} to types.Config func mapToConfig(configMap map[string]interface{}) types.Config { var config types.Config // Set basic fields - config.Chain = configMap["chain"].(string) - config.Denom = configMap["denom"].(string) - config.Prefix = configMap["prefix"].(string) + if chainValue, ok := configMap["chain"]; ok && chainValue != nil { + if chainStr, ok := chainValue.(string); ok && chainStr != "" { + config.Chain = chainStr + } else { + config.Chain = "cosmoshub" + fmt.Println("Warning: chain not specified or invalid in config, defaulting to 'cosmoshub'") + } + } else { + config.Chain = "cosmoshub" + fmt.Println("Warning: chain not specified in config, defaulting to 'cosmoshub'") + } + + if denomValue, ok := configMap["denom"]; ok && denomValue != nil { + if denomStr, ok := denomValue.(string); ok && denomStr != "" { + config.Denom = denomStr + } else { + config.Denom = "uatom" + fmt.Println("Warning: denom not specified or invalid in config, defaulting to 'uatom'") + } + } else { + config.Denom = "uatom" + fmt.Println("Warning: denom not specified in config, defaulting to 'uatom'") + } + + if prefixValue, ok := configMap["prefix"]; ok && prefixValue != nil { + if prefixStr, ok := prefixValue.(string); ok && prefixStr != "" { + config.Prefix = prefixStr + } else { + config.Prefix = "cosmos" + fmt.Println("Warning: prefix not specified or invalid in config, defaulting to 'cosmos'") + } + } else { + config.Prefix = "cosmos" + fmt.Println("Warning: prefix not specified in config, defaulting to 'cosmos'") + } + + // Set balance_funds flag + if balanceFundsValue, ok := configMap["balance_funds"]; ok { + if balanceFunds, ok := balanceFundsValue.(bool); ok { + config.BalanceFunds = balanceFunds + } else { + // Default to false if not a boolean + config.BalanceFunds = false + fmt.Println("Warning: balance_funds specified but not a boolean, defaulting to false") + } + } else { + // Default to false if not specified + config.BalanceFunds = false + } // Handle slip44 value for address derivation if slip44, ok := configMap["slip44"].(int); ok { @@ -477,7 +529,7 @@ func mapToConfig(configMap map[string]interface{}) types.Config { } // Before returning, update the gas config to ensure adaptive gas is enabled - updateGasConfig(&config) + configureGasPrice(&config) return config } @@ -1050,142 +1102,111 @@ func initializeDistributor(config types.Config, enableViz bool) *bankmodule.Mult return distributor } -// launchTransactionBroadcasters launches goroutines to broadcast transactions -func launchTransactionBroadcasters( - accounts []types.Account, - config types.Config, - chainID string, - distributor *bankmodule.MultiSendDistributor, - enableViz bool, -) { - var wg sync.WaitGroup - - for _, account := range accounts { - wg.Add(1) - go func(acct types.Account) { - defer wg.Done() - processAccount(acct, config, chainID, distributor, enableViz) - }(account) - } - - wg.Wait() -} - -// processAccount handles transaction broadcasting for a single account -func processAccount( - acct types.Account, - config types.Config, - chainID string, - distributor *bankmodule.MultiSendDistributor, - enableViz bool, -) { - // Get account info - sequence, accNum, err := lib.GetAccountInfo(acct.Address, config) - if err != nil { - log.Printf("Failed to get account info for %s: %v", acct.Address, err) - return +// cleanupResources cleans up resources used by the program +func cleanupResources(distributor *bankmodule.MultiSendDistributor, enableViz bool) { + fmt.Println("āœ… All transactions completed. Cleaning up resources...") + if distributor != nil { + distributor.Cleanup() } - // Prepare transaction parameters - txParams := prepareTransactionParams(acct, config, chainID, sequence, accNum, distributor) - - // Log the start of processing for this account + // Stop the visualizer if enableViz { - broadcast.LogVisualizerDebug(fmt.Sprintf("Starting transaction broadcasts for account %s (Position %d)", - acct.Address, acct.Position)) + broadcast.StopVisualizer() } - - // Broadcast transactions - successfulTxs, failedTxs, responseCodes, _ := broadcast.Loop(txParams, BatchSize, int(acct.Position)) - - // Print results - printResults(acct.Address, successfulTxs, failedTxs, responseCodes) } -// prepareTransactionParams prepares the transaction parameters for an account -func prepareTransactionParams( - acct types.Account, - config types.Config, - chainID string, - sequence uint64, - accNum uint64, - distributor *bankmodule.MultiSendDistributor, -) types.TransactionParams { - // Use the distributor to get the next RPC endpoint if available - var nodeURL string - var txMsgType string // Determine the message type based on availability of distributor - - if distributor != nil { - nodeURL = distributor.GetNextRPC() - if nodeURL == "" { - nodeURL = config.Nodes.RPC[0] // Fallback - } - - // Use MsgBankMultisend when distributor is available and multisend is enabled - if config.MsgType == "bank_send" && config.Multisend { - txMsgType = MsgBankMultisend // Use our special distributed multisend - } else { - txMsgType = config.MsgType - } - } else { - nodeURL = config.Nodes.RPC[0] // Default to first RPC - txMsgType = config.MsgType +// updateGasConfig optimizes gas settings based on the message type +func updateGasConfig(config *types.Config) { + switch config.MsgType { + case "bank_send": + // Bank send typically needs less gas + config.BaseGas = 80000 + config.GasPerByte = 80 + case MsgBankMultisend: + // Multisend needs more gas based on number of recipients + config.BaseGas = 100000 + int64(config.NumMultisend)*20000 + config.GasPerByte = 100 + case "ibc_transfer": + // IBC transfers need more gas + config.BaseGas = 150000 + config.GasPerByte = 100 + case "store_code", "instantiate_contract": + // Wasm operations need significantly more gas + config.BaseGas = 400000 + config.GasPerByte = 150 } - // Convert MsgParams struct to map - msgParamsMap := types.ConvertMsgParamsToMap(config.MsgParams) - - // Explicitly set the from_address to the account's address - // This ensures it's always set correctly even if not present in config.MsgParams - msgParamsMap["from_address"] = acct.Address - - // Add distributor to msgParams for multisend operations - if distributor != nil && txMsgType == MsgBankMultisend { - msgParamsMap["distributor"] = distributor - } + fmt.Printf("šŸ”„ Optimized gas settings: BaseGas=%d, GasPerByte=%d, Gas.Low=%d\n", + config.BaseGas, config.GasPerByte, config.Gas.Low) +} - return types.TransactionParams{ - Config: config, - NodeURL: nodeURL, - ChainID: chainID, - Sequence: sequence, - AccNum: accNum, - PrivKey: acct.PrivKey, - PubKey: acct.PubKey, - AcctAddress: acct.Address, - MsgType: txMsgType, - MsgParams: msgParamsMap, - Distributor: distributor, // Pass distributor for multisend operations +// printConfig prints the configuration details for debugging +func printConfig(config types.Config) { + fmt.Println("\nāš™ļø Configuration:") + fmt.Printf("Chain: %s\n", config.Chain) + fmt.Printf("Denom: %s\n", config.Denom) + fmt.Printf("Prefix: %s\n", config.Prefix) + fmt.Printf("Positions: %d\n", config.Positions) + fmt.Printf("Multisend: %v\n", config.Multisend) + if config.Multisend { + fmt.Printf("Num Multisend: %d\n", config.NumMultisend) } -} + fmt.Printf("Broadcast Mode: %s\n", config.BroadcastMode) + fmt.Printf("Auto-balance Funds: %v\n", config.BalanceFunds) -// printResults prints the results of transaction broadcasting -func printResults(address string, successfulTxs, failedTxs int, responseCodes map[uint32]int) { - fmt.Printf("Account %s: Successful transactions: %d, Failed transactions: %d\n", - address, successfulTxs, failedTxs) + // Gas Configuration + fmt.Println("\nGas Configuration:") + fmt.Printf("Low: %d\n", config.Gas.Low) + fmt.Printf("Medium: %d\n", config.Gas.Medium) + fmt.Printf("High: %d\n", config.Gas.High) + fmt.Printf("Precision: %d\n", config.Gas.Precision) - fmt.Println("Response code breakdown:") - for code, count := range responseCodes { - percentage := float64(count) / float64(successfulTxs+failedTxs) * 100 - fmt.Printf("Code %d: %d (%.2f%%)\n", code, count, percentage) + // Node Configuration + fmt.Println("\nNode Configuration:") + for i, node := range config.Nodes.RPC { + fmt.Printf("RPC Node %d: %s\n", i+1, node) } } -// cleanupResources cleans up resources used by the program -func cleanupResources(distributor *bankmodule.MultiSendDistributor, enableViz bool) { - fmt.Println("āœ… All transactions completed. Cleaning up resources...") - if distributor != nil { - distributor.Cleanup() +func RunPositioning(config types.Config, distributor *bankmodule.MultiSendDistributor, enableViz bool) { + // Set up chain ID + chainID := config.Chain + if chainID == "" { + log.Fatal("Chain ID is required") } - // Stop the visualizer - if enableViz { - broadcast.StopVisualizer() + log.Printf("[Registry Mode] Positioning of %d accounts on chain %s", config.Positions, chainID) + + // Generate accounts + accounts := generateAccounts(config, nil) + if len(accounts) == 0 { + log.Fatal("No accounts generated") + } + + // Assign accounts to nodes at startup + if len(config.Nodes.RPC) > 1 { + log.Printf("Found %d RPC nodes, assigning accounts...", len(config.Nodes.RPC)) + // Assign accounts to nodes using round-robin + for i, acct := range accounts { + nodeIndex := i % len(config.Nodes.RPC) + nodeURL := config.Nodes.RPC[nodeIndex] + broadcast.AssignNodeToAccount(acct.Address, nodeURL) + log.Printf("Assigned account %s to node %s", acct.Address, nodeURL) + } + log.Printf("Assigned %d accounts to %d nodes for parallel broadcasting", + len(accounts), len(config.Nodes.RPC)) + } else { + log.Printf("Only one RPC node specified, no parallel node assignment performed") } + + // Use the ParallelBroadcast function for handling transaction broadcasting + successCount, failCount := broadcast.ParallelBroadcast(accounts, config, chainID, 1000) + fmt.Printf("\nāœ… Broadcasting complete: %d successful, %d failed transactions\n", + successCount, failCount) } -// Update the GasConfig when loading from a config map to ensure adaptive gas is enabled -func updateGasConfig(config *types.Config) { +// configureGasPrice sets up gas price settings when loading from a config map +func configureGasPrice(config *types.Config) { // Enable adaptive gas by default // This ensures we're always using the most efficient gas settings if config.Gas.Medium == 0 { @@ -1226,17 +1247,3 @@ func updateGasConfig(config *types.Config) { fmt.Printf("Gas optimization enabled: Using adaptive gas strategy with base price %s%s\n", config.Gas.Price, config.Gas.Denom) } - -// printConfig prints the configuration details for debugging -func printConfig(config types.Config) { - fmt.Println("=== Registry Mode Configuration ===") - fmt.Printf("Chain: %s\n", config.Chain) - fmt.Printf("Prefix: %s\n", config.Prefix) - fmt.Printf("Denom: %s\n", config.Denom) - fmt.Printf("Slip44: %d\n", config.Slip44) - fmt.Printf("Positions: %d\n", config.Positions) - fmt.Printf("Message Type: %s\n", config.MsgType) - fmt.Printf("Multisend: %v\n", config.Multisend) - fmt.Printf("Num Multisend: %d\n", config.NumMultisend) - fmt.Println("==================================") -} diff --git a/modules/bank/multisend_distributor.go b/modules/bank/multisend_distributor.go index 4474535..4629be3 100644 --- a/modules/bank/multisend_distributor.go +++ b/modules/bank/multisend_distributor.go @@ -61,9 +61,10 @@ func (m *MultiSendDistributor) GetNextRPC() string { m.rpcIndex = (m.rpcIndex + 1) % len(m.rpcs) // If we've gone through all nodes once, consider refreshing the list - if m.rpcIndex == 0 { - // Refresh endpoints every 30 minutes or if our endpoint list is small - if time.Since(m.lastRefreshTime) > 30*time.Minute || len(m.rpcs) < 5 { + // We'll check more frequently now to find more non-registry nodes + if m.rpcIndex == 0 || m.rpcIndex%10 == 0 { + // Refresh endpoints more frequently + if time.Since(m.lastRefreshTime) > 10*time.Minute || len(m.rpcs) < 10 { go m.RefreshEndpoints() } } @@ -78,22 +79,36 @@ func (m *MultiSendDistributor) RefreshEndpoints() { fmt.Println("Refreshing RPC endpoints through peer discovery...") - // Use a reasonable timeout for discovery - discoveryTimeout := 30 * time.Second + // Use a longer timeout for more thorough discovery + discoveryTimeout := 45 * time.Second // Discover new peers - newEndpoints, err := m.peerDiscovery.DiscoverPeers(discoveryTimeout) + _, err := m.peerDiscovery.DiscoverPeers(discoveryTimeout) if err != nil { fmt.Printf("Warning: Failed to discover new peers: %v\n", err) return } - // If we found new endpoints, update our list - if len(newEndpoints) > len(m.rpcs) { - fmt.Printf("Updated RPC endpoint list: %d → %d endpoints\n", - len(m.rpcs), len(newEndpoints)) - m.rpcs = newEndpoints + // Get the prioritized list (non-registry nodes first) + prioritizedEndpoints := m.peerDiscovery.GetPrioritizedEndpoints() + + // If we found any endpoints, update our list + if len(prioritizedEndpoints) > 0 { + nonRegistryCount := len(m.peerDiscovery.GetNonRegistryNodes()) + fmt.Printf("Updated RPC endpoint list: %d endpoints (%d non-registry nodes)\n", + len(prioritizedEndpoints), nonRegistryCount) + m.rpcs = prioritizedEndpoints m.lastRefreshTime = time.Now() + + // Print a few non-registry nodes if we have them + if nonRegistryCount > 0 { + nonRegistryNodes := m.peerDiscovery.GetNonRegistryNodes() + maxDisplay := 5 + if nonRegistryCount < maxDisplay { + maxDisplay = nonRegistryCount + } + fmt.Printf("Sample non-registry nodes: %v\n", nonRegistryNodes[:maxDisplay]) + } } } @@ -101,13 +116,16 @@ func (m *MultiSendDistributor) RefreshEndpoints() { func (m *MultiSendDistributor) GetNextSeed() int64 { m.mutex.Lock() defer m.mutex.Unlock() - - seed := m.seedCounter m.seedCounter++ - return seed + return m.seedCounter +} + +// GetPeerDiscovery returns the peer discovery instance +func (m *MultiSendDistributor) GetPeerDiscovery() *peerdiscovery.PeerDiscovery { + return m.peerDiscovery } -// getOrCreateToAddress returns a recipient address or creates a deterministic one from a seed +// Helper function to get or create a recipient address func getOrCreateToAddress(specifiedAddress, randomSeed string) (sdk.AccAddress, error) { if specifiedAddress != "" { toAccAddress, err := sdk.AccAddressFromBech32(specifiedAddress) diff --git a/types/types.go b/types/types.go index 2616e43..71af293 100644 --- a/types/types.go +++ b/types/types.go @@ -141,7 +141,8 @@ type Config struct { Nodes NodesConfig `toml:"nodes"` BroadcastMode string `toml:"broadcast_mode"` Positions uint `toml:"positions"` - FromAddress string `toml:"from_address"` // Default sender address + FromAddress string `toml:"from_address"` // Default sender address + BalanceFunds bool `toml:"balance_funds"` // Whether to automatically balance funds between accounts } type MsgParams struct {