diff --git a/pkg/eth/eth.go b/pkg/eth/eth.go index 28491baf..d2316f5a 100644 --- a/pkg/eth/eth.go +++ b/pkg/eth/eth.go @@ -224,7 +224,7 @@ func (eth *EthService) startEthDataManager(ctx context.Context) error { defer proposalOutcomeSub.Unsubscribe() // interval to clean refresh all indexed data - ticker := time.NewTicker(1 * time.Hour) + ticker := time.NewTicker(6 * time.Hour) for { select { diff --git a/pkg/integration_tests/00_blocks_test.go b/pkg/integration_tests/00_blocks_test.go index 2dfcd5a2..1b96cf18 100644 --- a/pkg/integration_tests/00_blocks_test.go +++ b/pkg/integration_tests/00_blocks_test.go @@ -19,7 +19,7 @@ func TestBlockCreation(t *testing.T) { _, err := sdk.Core.Ping(ctx, connect.NewRequest(&corev1.PingRequest{})) assert.NoError(t, err) - err = utils.WaitForDevnetHealthy(30 * time.Second) + err = utils.WaitForDevnetHealthy() assert.NoError(t, err) var blockOne *corev1.Block diff --git a/pkg/integration_tests/00_manage_entity_test.go b/pkg/integration_tests/00_manage_entity_test.go index 1c1f28a1..fbc81b1d 100644 --- a/pkg/integration_tests/00_manage_entity_test.go +++ b/pkg/integration_tests/00_manage_entity_test.go @@ -19,7 +19,7 @@ func TestEntityManager(t *testing.T) { ctx := context.Background() sdk := utils.DiscoveryOne - err := utils.WaitForDevnetHealthy(60 * time.Second) + err := utils.WaitForDevnetHealthy() assert.NoError(t, err) // Generate a test private key diff --git a/pkg/integration_tests/00_slash_proposal_test.go b/pkg/integration_tests/00_slash_proposal_test.go index 8ce714e7..5840ee2d 100644 --- a/pkg/integration_tests/00_slash_proposal_test.go +++ b/pkg/integration_tests/00_slash_proposal_test.go @@ -35,7 +35,7 @@ func TestCanRetrieveSlashProposal(t *testing.T) { wsRpcUrl = "ws" + strings.TrimPrefix(wsRpcUrl, "http") } - err := utils.WaitForDevnetHealthy(30 * time.Second) + err := utils.WaitForDevnetHealthy() require.NoError(t, err) ethrpc, err := ethclient.Dial(wsRpcUrl) diff --git a/pkg/integration_tests/10_upload_stream_test.go b/pkg/integration_tests/10_upload_stream_test.go index 0c8f5125..abee56c1 100644 --- a/pkg/integration_tests/10_upload_stream_test.go +++ b/pkg/integration_tests/10_upload_stream_test.go @@ -25,7 +25,7 @@ import ( func TestUploadStream(t *testing.T) { ctx := context.Background() - require.NoError(t, utils.WaitForDevnetHealthy(30*time.Second)) + require.NoError(t, utils.WaitForDevnetHealthy()) serverAddr := "node3.oap.devnet" privKeyPath := "./assets/demo_key.txt" diff --git a/pkg/integration_tests/12_rewards_test.go b/pkg/integration_tests/12_rewards_test.go index bebea62e..a6536418 100644 --- a/pkg/integration_tests/12_rewards_test.go +++ b/pkg/integration_tests/12_rewards_test.go @@ -3,7 +3,6 @@ package integration_tests import ( "context" "testing" - "time" v1 "github.com/OpenAudio/go-openaudio/pkg/api/core/v1" "github.com/OpenAudio/go-openaudio/pkg/common" @@ -17,7 +16,7 @@ func TestRewardsLifecycle(t *testing.T) { nodeUrl := utils.DiscoveryOneRPC // Wait for devnet to be ready - if err := utils.WaitForDevnetHealthy(30 * time.Second); err != nil { + if err := utils.WaitForDevnetHealthy(); err != nil { t.Fatalf("Devnet not ready: %v", err) } diff --git a/pkg/integration_tests/90_deregistration_test.go b/pkg/integration_tests/90_deregistration_test.go index acceef8c..972ef25d 100644 --- a/pkg/integration_tests/90_deregistration_test.go +++ b/pkg/integration_tests/90_deregistration_test.go @@ -45,7 +45,7 @@ func TestDeregisterNode(t *testing.T) { wsRpcUrl = "ws" + strings.TrimPrefix(wsRpcUrl, "http") } - err := utils.WaitForDevnetHealthy(30 * time.Second) + err := utils.WaitForDevnetHealthy() require.NoError(t, err) ethrpc, err := ethclient.Dial(wsRpcUrl) diff --git a/pkg/integration_tests/utils/nodes.go b/pkg/integration_tests/utils/nodes.go index 8c78a961..79b24f19 100644 --- a/pkg/integration_tests/utils/nodes.go +++ b/pkg/integration_tests/utils/nodes.go @@ -3,6 +3,7 @@ package utils import ( "context" "crypto/tls" + "encoding/json" "errors" "net/http" "os" @@ -69,8 +70,12 @@ func EnsureProtocol(endpoint string) string { return endpoint } -func WaitForDevnetHealthy(timeout time.Duration) error { - ctx, cancel := context.WithTimeout(context.Background(), timeout) +func WaitForDevnetHealthy(timeout ...time.Duration) error { + timeoutDuration := 60 * time.Second + if len(timeout) > 0 { + timeoutDuration = timeout[0] + } + ctx, cancel := context.WithTimeout(context.Background(), timeoutDuration) defer cancel() nodes := []*sdk.OpenAudioSDK{ @@ -80,14 +85,23 @@ func WaitForDevnetHealthy(timeout time.Duration) error { ContentThree, } + nodeAddresses := []string{ + ContentOneRPC, + ContentTwoRPC, + ContentThreeRPC, + } + ticker := time.NewTicker(1 * time.Second) defer ticker.Stop() + client := NewTestHTTPClient() + for { select { case <-ctx.Done(): return errors.New("timed out waiting for devnet to be ready") case <-ticker.C: + // Check core services are ready allReady := true for _, n := range nodes { status, err := n.Core.GetStatus(context.Background(), connect.NewRequest(&corev1.GetStatusRequest{})) @@ -99,7 +113,48 @@ func WaitForDevnetHealthy(timeout time.Duration) error { break } } - if allReady { + if !allReady { + continue + } + + // Check mediorum services have wallets registered + allMediorumReady := true + var healthResponse struct { + Storage struct { + WalletIsRegistered bool `json:"wallet_is_registered"` + } `json:"storage"` + } + + for _, addr := range nodeAddresses { + // Ensure https:// protocol + baseURL := addr + if !strings.HasPrefix(baseURL, "https://") && !strings.HasPrefix(baseURL, "http://") { + baseURL = "https://" + baseURL + } else if strings.HasPrefix(baseURL, "http://") { + baseURL = strings.Replace(baseURL, "http://", "https://", 1) + } + + req, err := http.NewRequestWithContext(ctx, "GET", baseURL+"/health-check", nil) + if err != nil { + allMediorumReady = false + break + } + + resp, err := client.Do(req) + if err != nil { + allMediorumReady = false + break + } + + if resp.StatusCode != 200 || json.NewDecoder(resp.Body).Decode(&healthResponse) != nil || !healthResponse.Storage.WalletIsRegistered { + resp.Body.Close() + allMediorumReady = false + break + } + resp.Body.Close() + } + + if allReady && allMediorumReady { return nil } } diff --git a/pkg/mediorum/server/server.go b/pkg/mediorum/server/server.go index 0b18498e..ecf18256 100644 --- a/pkg/mediorum/server/server.go +++ b/pkg/mediorum/server/server.go @@ -665,7 +665,7 @@ func (ss *MediorumServer) startPprofServer(ctx context.Context) error { } func (ss *MediorumServer) refreshPeersAndSigners(ctx context.Context) error { - interval := 30 * time.Minute + interval := 10 * time.Minute if os.Getenv("OPENAUDIO_ENV") == "dev" { interval = 10 * time.Second } @@ -715,7 +715,29 @@ func (ss *MediorumServer) refreshPeersAndSigners(ctx context.Context) error { ss.Config.Peers = peers ss.Config.Signers = signers - ss.logger.Info("updated peers and signers dynamically", zap.Int("peers", len(peers)), zap.Int("signers", len(signers))) + // Update WalletIsRegistered based on updated peers list + ss.Config.WalletIsRegistered = false + for _, peer := range peers { + if strings.EqualFold(ss.Config.Self.Wallet, peer.Wallet) && strings.EqualFold(ss.Config.Self.Host, peer.Host) { + ss.Config.WalletIsRegistered = true + break + } + } + + // Log detailed info if not registered to help diagnose issues + if !ss.Config.WalletIsRegistered { + peerHosts := make([]string, len(peers)) + for i, p := range peers { + peerHosts[i] = p.Host + } + ss.logger.Warn("node not found in registered peers list", + zap.String("self_host", ss.Config.Self.Host), + zap.String("self_wallet", ss.Config.Self.Wallet), + zap.Int("total_peers", len(peers)), + zap.Strings("peer_hosts", peerHosts)) + } + + ss.logger.Info("updated peers and signers dynamically", zap.Int("peers", len(peers)), zap.Int("signers", len(signers)), zap.Bool("wallet_is_registered", ss.Config.WalletIsRegistered)) case <-ctx.Done(): return ctx.Err() } diff --git a/pkg/uptime/uptime.go b/pkg/uptime/uptime.go index 2d5a6ed0..1063e047 100644 --- a/pkg/uptime/uptime.go +++ b/pkg/uptime/uptime.go @@ -187,7 +187,7 @@ func toPeers(endpoints []*ethv1.ServiceEndpoint) []registrar.Peer { } func (u *Uptime) startPeerRefresher() { - interval := 30 * time.Minute + interval := 10 * time.Minute if os.Getenv("OPENAUDIO_ENV") == "dev" { interval = 10 * time.Second }