Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 47 additions & 9 deletions broadcast/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package broadcast
import (
"context"
"fmt"
"sync"
"time"

coretypes "github.com/cometbft/cometbft/rpc/core/types"
Expand All @@ -15,7 +16,13 @@ import (
banktypes "github.com/cosmos/cosmos-sdk/x/bank/types"
)

// Add these at the top of the file
// Global variables for account locking and node assignment
var (
accountLocks sync.Map // map[string]*sync.Mutex for account serialization
accountNodeMap sync.Map // map[string]string for account-to-node mapping
accountNodeMapMux sync.Mutex
)

type TimingMetrics struct {
PrepStart time.Time
SignStart time.Time
Expand Down Expand Up @@ -61,14 +68,17 @@ func init() {
banktypes.RegisterInterfaces(cdc.InterfaceRegistry())
}

// Transaction broadcasts the transaction bytes to the given RPC endpoint.
func Transaction(txBytes []byte, rpcEndpoint string) (*coretypes.ResultBroadcastTx, error) {
client, err := GetClient(rpcEndpoint)
if err != nil {
return nil, err
// assignNodeToAccount assigns a node to an account if not already assigned
func assignNodeToAccount(acctAddress, nodeURL string) string {
accountNodeMapMux.Lock()
defer accountNodeMapMux.Unlock()

if assignedNode, ok := accountNodeMap.Load(acctAddress); ok {
return assignedNode.(string)
}

return client.Transaction(txBytes)
accountNodeMap.Store(acctAddress, nodeURL)
return nodeURL
}

// Loop handles the main transaction broadcasting logic
Expand All @@ -82,8 +92,23 @@ func Loop(
responseCodes = make(map[uint32]int)
sequence := txParams.Sequence

// Assign this account to the provided node if not already assigned
nodeURL := assignNodeToAccount(txParams.AcctAddress, txParams.NodeURL)
if nodeURL != txParams.NodeURL {
fmt.Printf("[POS-%d] Account %s reassigned to node %s (original: %s)\n",
position, txParams.AcctAddress, nodeURL, txParams.NodeURL)
}
txParams.NodeURL = nodeURL

// Get or create lock for this account
lock, _ := accountLocks.LoadOrStore(txParams.AcctAddress, &sync.Mutex{})
mutex := lock.(*sync.Mutex)
mutex.Lock()
defer mutex.Unlock()

// Log the start of broadcasting for this position
LogVisualizerDebug(fmt.Sprintf("Starting broadcasts for position %d (batchSize: %d)", position, batchSize))
LogVisualizerDebug(fmt.Sprintf("Starting broadcasts for position %d (batchSize: %d) on node %s",
position, batchSize, txParams.NodeURL))

for i := 0; i < batchSize; i++ {
currentSequence := sequence
Expand All @@ -107,7 +132,7 @@ func Loop(
// Update visualizer with failed tx
UpdateVisualizerStats(0, 1, txLatency)

if resp != nil && resp.Code == 32 {
if resp != nil && resp.Code == 32 { // Sequence mismatch
newSeq, success, newResp := handleSequenceMismatch(txParams, position, sequence, err)
sequence = newSeq
if success {
Expand All @@ -117,8 +142,16 @@ func Loop(
// Update visualizer with successful tx after sequence recovery
UpdateVisualizerStats(1, 0, metrics.Complete.Sub(metrics.PrepStart))
}
// Record gas result even on failure for fee adjustment
GetGasStrategyManager().RecordTransactionResult(
txParams.NodeURL, success, 0, txParams.MsgType, txParams.MsgParams["calculated_gas_amount"].(uint64), err.Error())
continue
Comment on lines +146 to 148
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Validate key existence before casting to uint64.
Casting txParams.MsgParams["calculated_gas_amount"] to uint64 without checking might cause a panic if the key is missing or if the type is different.


🏁 Script executed:

#!/bin/bash
# This script searches for potential calls to `RecordTransactionResult` where
# "calculated_gas_amount" may not be set in `txParams.MsgParams`.

rg -A 5 "RecordTransactionResult" 

Length of output: 1886


Action: Validate key existence of "calculated_gas_amount" before type casting

Before calling RecordTransactionResult, please ensure that the txParams.MsgParams map actually contains the calculated_gas_amount key and that its value is of type uint64. Directly casting without this check can lead to a panic if the key is missing or holds an unexpected type.

For example, update:

GetGasStrategyManager().RecordTransactionResult(
	txParams.NodeURL, success, 0, txParams.MsgType, txParams.MsgParams["calculated_gas_amount"].(uint64), err.Error())
continue

to something like:

gasAmount, ok := txParams.MsgParams["calculated_gas_amount"].(uint64)
if !ok {
	// Handle the error case - either assign a default value or log the error
	// For example:
	errMsg := "calculated_gas_amount missing or not a uint64"
	GetGasStrategyManager().RecordTransactionResult(
		txParams.NodeURL, success, 0, txParams.MsgType, 0, errMsg)
	continue
}
GetGasStrategyManager().RecordTransactionResult(
	txParams.NodeURL, success, 0, txParams.MsgType, gasAmount, err.Error())
continue

This change will make the code more resilient against potential panics due to unexpected input.

}

// Record failure for gas strategy adjustment
gasLimit, _ := txParams.MsgParams["calculated_gas_amount"].(uint64)
GetGasStrategyManager().RecordTransactionResult(
txParams.NodeURL, false, 0, txParams.MsgType, gasLimit, err.Error())
continue
}

Expand All @@ -127,6 +160,11 @@ func Loop(
responseCodes[resp.Code]++
sequence++

// Record success for gas strategy
gasLimit, _ := txParams.MsgParams["calculated_gas_amount"].(uint64)
GetGasStrategyManager().RecordTransactionResult(
txParams.NodeURL, true, 0, txParams.MsgType, gasLimit, "")

// Update visualizer with successful tx
UpdateVisualizerStats(1, 0, txLatency)
}
Expand Down
75 changes: 34 additions & 41 deletions broadcast/gas_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,24 @@ package broadcast
import (
"context"
"fmt"
"strings"
"sync"
"time"

rpchttp "github.com/cometbft/cometbft/rpc/client/http"
"github.com/somatic-labs/meteorite/lib"
"github.com/somatic-labs/meteorite/types"
)

// NodeGasCapabilities tracks whether a node accepts 0-gas transactions
// NodeGasCapabilities tracks gas-related capabilities for a node
type NodeGasCapabilities struct {
NodeURL string
AcceptsZeroGas bool
MinAcceptableGas int64
MinGasPrice float64 // Minimum gas price required by this node
LastChecked time.Time
SuccessfulTxCount int
FailedTxCount int
// Tracking gas usage per message type
GasUsageByMsgType map[string]*GasUsageStats
}

Expand All @@ -27,7 +29,7 @@ type GasUsageStats struct {
SuccessCount int64
TotalGasUsed int64
AverageGasUsed int64
RecentGasValues []int64 // Store recent gas values for better averaging
RecentGasValues []int64
MaxGasUsed int64
MinGasUsed int64
FailedCount int64
Expand All @@ -53,11 +55,11 @@ func (g *GasStrategyManager) GetNodeCapabilities(nodeURL string) *NodeGasCapabil

caps, exists := g.nodeCapabilities[nodeURL]
if !exists {
// Return default capabilities if node hasn't been tested yet
return &NodeGasCapabilities{
NodeURL: nodeURL,
AcceptsZeroGas: false,
MinAcceptableGas: 0,
MinGasPrice: 0.0, // Default to 0 until updated
LastChecked: time.Time{},
GasUsageByMsgType: make(map[string]*GasUsageStats),
}
Expand All @@ -74,7 +76,7 @@ func (g *GasStrategyManager) UpdateNodeCapabilities(caps *NodeGasCapabilities) {
}

// RecordTransactionResult records the result of a transaction for a node
func (g *GasStrategyManager) RecordTransactionResult(nodeURL string, success bool, gasUsed int64, msgType string) {
func (g *GasStrategyManager) RecordTransactionResult(nodeURL string, success bool, gasUsed int64, msgType string, txGasLimit uint64, requiredFee string) {
g.mutex.Lock()
defer g.mutex.Unlock()

Expand All @@ -84,63 +86,68 @@ func (g *GasStrategyManager) RecordTransactionResult(nodeURL string, success boo
NodeURL: nodeURL,
AcceptsZeroGas: false,
MinAcceptableGas: gasUsed,
MinGasPrice: 0.0,
LastChecked: time.Now(),
GasUsageByMsgType: make(map[string]*GasUsageStats),
}
}

// Ensure GasUsageByMsgType is initialized
if caps.GasUsageByMsgType == nil {
caps.GasUsageByMsgType = make(map[string]*GasUsageStats)
}

// Get or create stats for this message type
stats, exists := caps.GasUsageByMsgType[msgType]
if !exists {
stats = &GasUsageStats{
MinGasUsed: gasUsed,
MaxGasUsed: gasUsed,
RecentGasValues: make([]int64, 0, 10), // Keep last 10 gas values
RecentGasValues: make([]int64, 0, 10),
}
caps.GasUsageByMsgType[msgType] = stats
}

if success {
caps.SuccessfulTxCount++

// Update gas usage statistics for this message type
stats.SuccessCount++
stats.TotalGasUsed += gasUsed
stats.AverageGasUsed = stats.TotalGasUsed / stats.SuccessCount

// Update min/max gas used
if gasUsed < stats.MinGasUsed {
stats.MinGasUsed = gasUsed
}
if gasUsed > stats.MaxGasUsed {
stats.MaxGasUsed = gasUsed
}

// Store recent gas values for better averaging (keep last 10)
if len(stats.RecentGasValues) >= 10 {
stats.RecentGasValues = stats.RecentGasValues[1:] // Remove oldest
stats.RecentGasValues = stats.RecentGasValues[1:]
}
stats.RecentGasValues = append(stats.RecentGasValues, gasUsed)

// If successful with lower gas than we thought was required for this node,
// update our minimum gas estimate
if gasUsed < caps.MinAcceptableGas || caps.MinAcceptableGas == 0 {
caps.MinAcceptableGas = gasUsed
}

// If a zero-gas transaction was successful, mark this node as accepting zero gas
if gasUsed == 0 {
caps.AcceptsZeroGas = true
fmt.Printf("Node %s accepts zero-gas transactions! Optimizing future transactions.\n", nodeURL)
}
} else {
caps.FailedTxCount++
stats.FailedCount++

// Update MinGasPrice if the failure is due to insufficient fees
if strings.Contains(requiredFee, "insufficient fee") {
requiredAmount, requiredDenom, err := lib.ExtractRequiredFee(requiredFee)
if err == nil && requiredDenom == "uatone" && txGasLimit > 0 {
requiredGasPrice := float64(requiredAmount) / float64(txGasLimit)
if requiredGasPrice > caps.MinGasPrice {
caps.MinGasPrice = requiredGasPrice
fmt.Printf("Updated MinGasPrice for node %s to %f based on required fee %d\n",
nodeURL, caps.MinGasPrice, requiredAmount)
}
}
}
}

caps.LastChecked = time.Now()
Expand All @@ -154,21 +161,19 @@ func (g *GasStrategyManager) GetAverageGasForMsgType(nodeURL, msgType string) in

caps, exists := g.nodeCapabilities[nodeURL]
if !exists || caps.GasUsageByMsgType == nil {
return 0 // No data available
return 0
}

stats, exists := caps.GasUsageByMsgType[msgType]
if !exists || stats.SuccessCount == 0 {
return 0 // No data for this message type
return 0
}

// If we have recent values, calculate a weighted average
if len(stats.RecentGasValues) > 0 {
// Calculate weighted average with more weight to recent values
total := int64(0)
weights := 0
for i, gas := range stats.RecentGasValues {
weight := i + 1 // More weight to more recent values
weight := i + 1
total += gas * int64(weight)
weights += weight
}
Expand All @@ -179,25 +184,20 @@ func (g *GasStrategyManager) GetAverageGasForMsgType(nodeURL, msgType string) in
}

// GetRecommendedGasForMsgType returns the recommended gas for a message type
// It takes into account average usage, success rate, and applies a safety buffer
func (g *GasStrategyManager) GetRecommendedGasForMsgType(nodeURL, msgType string, baseGas int64) int64 {
avgGas := g.GetAverageGasForMsgType(nodeURL, msgType)

// If we have historical data, use it with a safety buffer
if avgGas > 0 {
// Apply a 20% safety buffer to the average
safetyBuffer := float64(1.2)
recommendedGas := int64(float64(avgGas) * safetyBuffer)

// Always use at least the base gas
if recommendedGas < baseGas {
recommendedGas = baseGas
}

return recommendedGas
}

// If no historical data, return the base gas
return baseGas
}

Expand All @@ -207,46 +207,39 @@ func (g *GasStrategyManager) TestZeroGasTransaction(ctx context.Context, nodeURL
defer g.mutex.Unlock()

caps, exists := g.nodeCapabilities[nodeURL]
// If we already know this node or it was checked recently, don't test again
if exists && time.Since(caps.LastChecked) < 1*time.Hour {
return caps.AcceptsZeroGas
}

// Create a new capability entry if it doesn't exist
if !exists {
caps = &NodeGasCapabilities{
NodeURL: nodeURL,
AcceptsZeroGas: false,
MinGasPrice: 0.0,
LastChecked: time.Now(),
GasUsageByMsgType: make(map[string]*GasUsageStats),
}
}

// We'll test a zero-gas transaction to see if the node accepts it
fmt.Printf("Testing if node %s accepts zero-gas transactions...\n", nodeURL)

// Connect to the node
client, err := rpchttp.New(nodeURL, "/websocket")
if err != nil {
fmt.Printf("Failed to connect to node %s: %v\n", nodeURL, err)
g.nodeCapabilities[nodeURL] = caps
return false
}

// Check if the node is up first
_, err = client.Status(ctx)
if err != nil {
fmt.Printf("Node %s is unreachable: %v\n", nodeURL, err)
g.nodeCapabilities[nodeURL] = caps
return false
}

// For now, we'll just set a flag to indicate that we should try a zero-gas transaction with this node
// The actual test will happen when we send a real transaction
caps.LastChecked = time.Now()
g.nodeCapabilities[nodeURL] = caps

return false // Default to false until proven otherwise
return false // Default until proven
}

// DetermineOptimalGasForNode determines the optimal gas limit for a specific node
Expand All @@ -257,36 +250,36 @@ func (g *GasStrategyManager) DetermineOptimalGasForNode(
msgType string,
canUseZeroGas bool,
) uint64 {
// Get node capabilities
caps := g.GetNodeCapabilities(nodeURL)

// If this node accepts zero gas and we're allowed to use it, use zero gas
if caps.AcceptsZeroGas && canUseZeroGas {
fmt.Printf("Using zero gas for node %s which accepts zero-gas transactions\n", nodeURL)
return 0
}

// Try to get a recommended gas amount based on historical data
recommendedGas := g.GetRecommendedGasForMsgType(nodeURL, msgType, int64(baseGasLimit))
if recommendedGas > 0 {
fmt.Printf("Using optimized gas amount for node %s and msg type %s: %d (based on historical data)\n",
nodeURL, msgType, recommendedGas)
return uint64(recommendedGas)
}

// If we have a known minimum gas value for this node, use it if it's lower than the base gas
if caps.MinAcceptableGas > 0 && uint64(caps.MinAcceptableGas) < baseGasLimit {
gasLimit := uint64(caps.MinAcceptableGas)
fmt.Printf("Using optimized gas amount for node %s: %d (down from %d)\n",
nodeURL, gasLimit, baseGasLimit)
return gasLimit
}

// Otherwise use the base gas limit
return baseGasLimit
}

// global instance of the gas strategy manager
// GetMinGasPrice returns the minimum gas price for a node
func (g *GasStrategyManager) GetMinGasPrice(nodeURL string) float64 {
caps := g.GetNodeCapabilities(nodeURL)
return caps.MinGasPrice
}

var (
globalGasManager *GasStrategyManager
globalGasManagerOnce sync.Once
Expand Down
2 changes: 1 addition & 1 deletion broadcast/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func SendTransactionViaGRPC(
txParams.Sequence = sequence

// Build and sign the transaction
txBytes, err := BuildAndSignTransaction(ctx, txParams, sequence, encodingConfig)
txBytes, err := BuildAndSignTransaction(ctx, txParams, sequence, encodingConfig.TxConfig)
if err != nil {
return nil, "", fmt.Errorf("failed to build transaction: %w", err)
}
Expand Down
Loading