Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
509 changes: 509 additions & 0 deletions broadcast/p2p_broadcast.go

Large diffs are not rendered by default.

36 changes: 30 additions & 6 deletions broadcast/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,20 +173,36 @@ func BuildTransaction(ctx context.Context, txParams *types.TxParams) ([]byte, er
return nil, err
}

// Ensure the from_address parameter is present and valid
fromAddress, exists := txParams.MsgParams["from_address"].(string)
if !exists || fromAddress == "" {
return nil, errors.New("from_address is missing or empty in message parameters")
}

// Try to parse the from_address to ensure it's a valid bech32 address
_, err := sdk.AccAddressFromBech32(fromAddress)
if err != nil {
return nil, fmt.Errorf("invalid from_address %s: %w", fromAddress, err)
}

// Get the client context
clientCtx, err := GetClientContext(txParams.Config, txParams.NodeURL)
clientCtx, err := P2PGetClientContext(txParams.Config, txParams.NodeURL)
if err != nil {
return nil, fmt.Errorf("failed to get client context: %w", err)
}

// Set the from address in the client context
fromAddr, _ := sdk.AccAddressFromBech32(fromAddress)
clientCtx = clientCtx.WithFromAddress(fromAddr)

// Create the message based on the message type and parameters
msg, err := createMessage(txParams)
if err != nil {
return nil, fmt.Errorf("failed to create message: %w", err)
}

// Get account information for the transaction signer
fromAddress, _ := txParams.MsgParams["from_address"].(string)
fromAddress, _ = txParams.MsgParams["from_address"].(string)
accNum, accSeq, err := GetAccountInfo(ctx, clientCtx, fromAddress)
if err != nil {
return nil, fmt.Errorf("failed to get account info: %w", err)
Expand Down Expand Up @@ -626,8 +642,8 @@ func BuildAndSignTransaction(
txp.MsgParams["distributor"] = txParams.Distributor
}

// Use ClientContext with correct sequence
clientCtx, err := GetClientContext(txParams.Config, txParams.NodeURL)
// Get client context
clientCtx, err := P2PGetClientContext(txParams.Config, txParams.NodeURL)
if err != nil {
return nil, fmt.Errorf("failed to get client context: %w", err)
}
Expand Down Expand Up @@ -982,21 +998,29 @@ func ProcessTxBroadcastResult(txResponse *sdk.TxResponse, err error, nodeURL str
}
}

// SendTx is a high-level function that builds, signs, and broadcasts a transaction
func SendTx(
ctx context.Context,
txParams types.TransactionParams,
sequence uint64,
broadcastMode string,
) (*sdk.TxResponse, error) {
// Ensure from_address is set in message parameters and use it to determine the node
fromAddress, exists := txParams.MsgParams["from_address"].(string)
if exists && fromAddress != "" && len(txParams.Config.Nodes.RPC) > 0 {
// We'll implement custom node selection logic in the registry's prepareTransactionParams
// This allows for more control over node selection at the transaction preparation stage
fmt.Printf("📝 Transaction for account %s using node %s\n",
fromAddress, txParams.NodeURL)
}

// Build and sign the transaction
txBytes, err := BuildAndSignTransaction(ctx, txParams, sequence, nil)
if err != nil {
return nil, fmt.Errorf("failed to build and sign transaction: %w", err)
}

// Get client context for broadcast
clientCtx, err := GetClientContext(txParams.Config, txParams.NodeURL)
clientCtx, err := P2PGetClientContext(txParams.Config, txParams.NodeURL)
if err != nil {
return nil, fmt.Errorf("failed to get client context: %w", err)
}
Expand Down
168 changes: 168 additions & 0 deletions broadcast/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,14 @@ package broadcast

import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"os"
"strings"
"time"

rpchttp "github.com/cometbft/cometbft/rpc/client/http"
types "github.com/somatic-labs/meteorite/types"
Expand All @@ -12,6 +18,7 @@ import (
"github.com/cosmos/cosmos-sdk/codec"
codectypes "github.com/cosmos/cosmos-sdk/codec/types"
cryptocodec "github.com/cosmos/cosmos-sdk/crypto/codec"
"github.com/cosmos/cosmos-sdk/crypto/keyring"
sdk "github.com/cosmos/cosmos-sdk/types"
authtx "github.com/cosmos/cosmos-sdk/x/auth/tx"
authtypes "github.com/cosmos/cosmos-sdk/x/auth/types"
Expand Down Expand Up @@ -136,3 +143,164 @@ func MakeEncodingConfig() codec.ProtoCodecMarshaler {

return marshaler
}

// GetHTTPClientContext creates a client context for HTTP transaction handling
// This is the HTTP-specific version of GetClientContext
func GetHTTPClientContext(config types.Config, nodeURL string) (client.Context, error) {
var clientContext client.Context

// Create keyring (use in-memory keyring for simplicity)
kr := keyring.NewInMemory(nil)

// Create client context
clientContext = client.Context{
ChainID: config.Chain,
NodeURI: nodeURL,
Keyring: kr,
BroadcastMode: "sync", // This will be overridden by the broadcast function
Simulate: false,
GenerateOnly: false,
Offline: false,
SkipConfirm: true,
}

return clientContext, nil
}

// GetKeyringFromBackend gets a keyring from a backend
func GetKeyringFromBackend(backend string) (keyring.Keyring, error) {
var kr keyring.Keyring
var err error

homeDir, err := os.UserHomeDir()
if err != nil {
return nil, fmt.Errorf("failed to get home directory: %v", err)
}

switch backend {
case "test":
kr, err = keyring.New("cosmos", keyring.BackendTest, homeDir, os.Stdin, nil)
case "file":
kr, err = keyring.New("cosmos", keyring.BackendFile, homeDir, os.Stdin, nil)
case "os":
kr, err = keyring.New("cosmos", keyring.BackendOS, homeDir, os.Stdin, nil)
case "memory":
kr = keyring.NewInMemory(nil)
err = nil
default:
kr = keyring.NewInMemory(nil)
err = nil
}

if err != nil {
return nil, fmt.Errorf("failed to create keyring: %v", err)
}

return kr, nil
}

// IsNodeReachable checks if a node is reachable
func IsNodeReachable(nodeURL string) bool {
if nodeURL == "" {
return false
}

// Create a new HTTP client with a timeout
client := &http.Client{
Timeout: 5 * time.Second,
}

// Make a health check request
healthURL := nodeURL
if !strings.HasSuffix(nodeURL, "/health") {
healthURL = strings.TrimSuffix(nodeURL, "/") + "/health"
}

req, err := http.NewRequest(http.MethodGet, healthURL, nil)
if err != nil {
return false
}

resp, err := client.Do(req)
if err != nil {
return false
}
defer resp.Body.Close()

// Read the response body
_, err = io.ReadAll(resp.Body)
if err != nil {
return false
}

return resp.StatusCode == http.StatusOK
}

// GetNodeInfo gets information about a node
func GetNodeInfo(nodeURL string) (types.NodeInfo, error) {
var nodeInfo types.NodeInfo

if nodeURL == "" {
return nodeInfo, errors.New("node URL is empty")
}

// Create a new HTTP client with a timeout
client := &http.Client{
Timeout: 5 * time.Second,
}

// Make a status request
statusURL := nodeURL
if !strings.HasSuffix(nodeURL, "/status") {
statusURL = strings.TrimSuffix(nodeURL, "/") + "/status"
}

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

req, err := http.NewRequestWithContext(ctx, http.MethodGet, statusURL, nil)
if err != nil {
return nodeInfo, fmt.Errorf("failed to create request: %v", err)
}

resp, err := client.Do(req)
if err != nil {
return nodeInfo, fmt.Errorf("failed to make request: %v", err)
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return nodeInfo, fmt.Errorf("status request failed with status code: %d", resp.StatusCode)
}

// Read the response body
body, err := io.ReadAll(resp.Body)
if err != nil {
return nodeInfo, fmt.Errorf("failed to read response body: %v", err)
}

// Parse the response
var statusResp map[string]interface{}
err = json.Unmarshal(body, &statusResp)
if err != nil {
return nodeInfo, fmt.Errorf("failed to parse response: %v", err)
}

// Extract the node info
result, ok := statusResp["result"].(map[string]interface{})
if !ok {
return nodeInfo, errors.New("invalid response format")
}

nodeInfoData, ok := result["node_info"].(map[string]interface{})
if !ok {
return nodeInfo, errors.New("invalid node info format")
}

// Get the network value from the node info data
if network, ok := nodeInfoData["network"].(string); ok {
nodeInfo.Network = network
}

return nodeInfo, nil
}
2 changes: 1 addition & 1 deletion lib/chainregistry/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func SelectChainInteractive(registry *Registry) (*ChainSelection, error) {
discovery := peerdiscovery.New(initialEndpoints)

// Discovery timeout (adjust as needed)
discoveryTimeout := 45 * time.Second
discoveryTimeout := 2 * time.Minute // Increased from 45 seconds to 2 minutes

// Discover additional peers
allEndpoints, err := discovery.DiscoverPeers(discoveryTimeout)
Expand Down
82 changes: 71 additions & 11 deletions lib/peerdiscovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@ import (

const (
// DefaultTimeout is the default timeout for RPC requests
DefaultTimeout = 5 * time.Second
DefaultTimeout = 10 * time.Second

// MaxRetries is the number of times to retry connecting to a node
MaxRetries = 3

// MaxConcurrentChecks is the maximum number of concurrent RPC checks
MaxConcurrentChecks = 50
Expand Down Expand Up @@ -57,6 +60,15 @@ func (pd *PeerDiscovery) DiscoverPeers(timeout time.Duration) ([]string, error)
// Start a wait group to track all goroutines
var wg sync.WaitGroup

// Keep track of initial registry endpoints
registryEndpoints := make(map[string]bool)
for _, endpoint := range pd.initialEndpoints {
normalized := normalizeEndpoint(endpoint)
if normalized != "" {
registryEndpoints[normalized] = true
}
}

// Process the initial endpoints
for _, endpoint := range pd.initialEndpoints {
endpoint = normalizeEndpoint(endpoint)
Expand Down Expand Up @@ -86,13 +98,35 @@ func (pd *PeerDiscovery) DiscoverPeers(timeout time.Duration) ([]string, error)
fmt.Println("Peer discovery timed out or was canceled.")
}

// Return the discovered endpoints
// Prepare the results with prioritization
pd.resultsMutex.RLock()
defer pd.resultsMutex.RUnlock()

// Make a copy to prevent external modification
results := make([]string, len(pd.openRPCEndpoints))
copy(results, pd.openRPCEndpoints)
// Separate discovered endpoints into registry and non-registry
var nonRegistryEndpoints []string
var regEndpoints []string

for _, endpoint := range pd.openRPCEndpoints {
if registryEndpoints[endpoint] {
regEndpoints = append(regEndpoints, endpoint)
} else {
nonRegistryEndpoints = append(nonRegistryEndpoints, endpoint)
}
}

// Prioritize non-registry endpoints, but include registry endpoints at the end
fmt.Printf("Found %d non-registry endpoints and %d registry endpoints.\n",
len(nonRegistryEndpoints), len(regEndpoints))

// Combine results with non-registry endpoints first
results := append(nonRegistryEndpoints, regEndpoints...)

// If we found no endpoints, try to return some registry endpoints as fallback
if len(results) == 0 && len(pd.initialEndpoints) > 0 {
fmt.Println("No endpoints discovered, using registry endpoints as fallback.")
results = make([]string, len(pd.initialEndpoints))
copy(results, pd.initialEndpoints)
}

return results, nil
}
Expand Down Expand Up @@ -142,17 +176,43 @@ func (pd *PeerDiscovery) checkNode(nodeAddr string) {
}
}

// Create a client with timeout
client, err := http.NewWithTimeout(nodeAddr, "websocket", uint(DefaultTimeout.Milliseconds()))
if err != nil {
fmt.Printf("Failed to create client for %s: %v\n", nodeAddr, err)
// Try connecting with retries
var client *http.HTTP
var err error
var connected bool

for retry := 0; retry < MaxRetries; retry++ {
// If this is a retry, wait a bit before trying again
if retry > 0 {
time.Sleep(time.Duration(retry) * time.Second)
fmt.Printf("Retry %d/%d connecting to %s\n", retry, MaxRetries, nodeAddr)
}

// Create a client with timeout
client, err = http.NewWithTimeout(nodeAddr, "websocket", uint(DefaultTimeout.Milliseconds()))
if err != nil {
fmt.Printf("Attempt %d: Failed to create client for %s: %v\n", retry+1, nodeAddr, err)
continue
}

// Verify this is a working RPC endpoint
_, err = client.Status(pd.ctx)
if err == nil {
connected = true
break
}
fmt.Printf("Attempt %d: Failed to get status from %s: %v\n", retry+1, nodeAddr, err)
}

if !connected {
fmt.Printf("Failed to connect to %s after %d attempts\n", nodeAddr, MaxRetries)
return
}

// Verify this is a working RPC endpoint
// Get status now that we're connected
status, err := client.Status(pd.ctx)
if err != nil {
fmt.Printf("Failed to get status from %s: %v\n", nodeAddr, err)
fmt.Printf("Failed to get status from %s after successful connection: %v\n", nodeAddr, err)
return
}

Expand Down
Loading
Loading