From f091e2cb07b0b3cf1cc2db0846882a400a2a5dcb Mon Sep 17 00:00:00 2001 From: Raymond Jacobson Date: Tue, 10 Feb 2026 10:26:26 -0800 Subject: [PATCH 1/6] Update eth fetching intervals --- pkg/eth/eth.go | 2 +- pkg/mediorum/server/server.go | 2 +- pkg/uptime/uptime.go | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/eth/eth.go b/pkg/eth/eth.go index 28491baf..d2316f5a 100644 --- a/pkg/eth/eth.go +++ b/pkg/eth/eth.go @@ -224,7 +224,7 @@ func (eth *EthService) startEthDataManager(ctx context.Context) error { defer proposalOutcomeSub.Unsubscribe() // interval to clean refresh all indexed data - ticker := time.NewTicker(1 * time.Hour) + ticker := time.NewTicker(6 * time.Hour) for { select { diff --git a/pkg/mediorum/server/server.go b/pkg/mediorum/server/server.go index 0b18498e..249e4a13 100644 --- a/pkg/mediorum/server/server.go +++ b/pkg/mediorum/server/server.go @@ -665,7 +665,7 @@ func (ss *MediorumServer) startPprofServer(ctx context.Context) error { } func (ss *MediorumServer) refreshPeersAndSigners(ctx context.Context) error { - interval := 30 * time.Minute + interval := 10 * time.Minute if os.Getenv("OPENAUDIO_ENV") == "dev" { interval = 10 * time.Second } diff --git a/pkg/uptime/uptime.go b/pkg/uptime/uptime.go index 2d5a6ed0..1063e047 100644 --- a/pkg/uptime/uptime.go +++ b/pkg/uptime/uptime.go @@ -187,7 +187,7 @@ func toPeers(endpoints []*ethv1.ServiceEndpoint) []registrar.Peer { } func (u *Uptime) startPeerRefresher() { - interval := 30 * time.Minute + interval := 10 * time.Minute if os.Getenv("OPENAUDIO_ENV") == "dev" { interval = 10 * time.Second } From 77401e08a513433235506266d28889fb5052bc85 Mon Sep 17 00:00:00 2001 From: Raymond Jacobson Date: Tue, 10 Feb 2026 11:30:59 -0800 Subject: [PATCH 2/6] Fix test by waiting for registration --- pkg/integration_tests/00_blocks_test.go | 2 +- .../00_manage_entity_test.go | 2 +- .../00_slash_proposal_test.go | 2 +- .../10_upload_stream_test.go | 2 +- pkg/integration_tests/12_rewards_test.go | 3 +- .../90_deregistration_test.go | 2 +- pkg/integration_tests/utils/nodes.go | 76 ++++++++++++++++++- 7 files changed, 79 insertions(+), 10 deletions(-) diff --git a/pkg/integration_tests/00_blocks_test.go b/pkg/integration_tests/00_blocks_test.go index 2dfcd5a2..1b96cf18 100644 --- a/pkg/integration_tests/00_blocks_test.go +++ b/pkg/integration_tests/00_blocks_test.go @@ -19,7 +19,7 @@ func TestBlockCreation(t *testing.T) { _, err := sdk.Core.Ping(ctx, connect.NewRequest(&corev1.PingRequest{})) assert.NoError(t, err) - err = utils.WaitForDevnetHealthy(30 * time.Second) + err = utils.WaitForDevnetHealthy() assert.NoError(t, err) var blockOne *corev1.Block diff --git a/pkg/integration_tests/00_manage_entity_test.go b/pkg/integration_tests/00_manage_entity_test.go index 1c1f28a1..fbc81b1d 100644 --- a/pkg/integration_tests/00_manage_entity_test.go +++ b/pkg/integration_tests/00_manage_entity_test.go @@ -19,7 +19,7 @@ func TestEntityManager(t *testing.T) { ctx := context.Background() sdk := utils.DiscoveryOne - err := utils.WaitForDevnetHealthy(60 * time.Second) + err := utils.WaitForDevnetHealthy() assert.NoError(t, err) // Generate a test private key diff --git a/pkg/integration_tests/00_slash_proposal_test.go b/pkg/integration_tests/00_slash_proposal_test.go index 8ce714e7..5840ee2d 100644 --- a/pkg/integration_tests/00_slash_proposal_test.go +++ b/pkg/integration_tests/00_slash_proposal_test.go @@ -35,7 +35,7 @@ func TestCanRetrieveSlashProposal(t *testing.T) { wsRpcUrl = "ws" + strings.TrimPrefix(wsRpcUrl, "http") } - err := utils.WaitForDevnetHealthy(30 * time.Second) + err := utils.WaitForDevnetHealthy() require.NoError(t, err) ethrpc, err := ethclient.Dial(wsRpcUrl) diff --git a/pkg/integration_tests/10_upload_stream_test.go b/pkg/integration_tests/10_upload_stream_test.go index 0c8f5125..abee56c1 100644 --- a/pkg/integration_tests/10_upload_stream_test.go +++ b/pkg/integration_tests/10_upload_stream_test.go @@ -25,7 +25,7 @@ import ( func TestUploadStream(t *testing.T) { ctx := context.Background() - require.NoError(t, utils.WaitForDevnetHealthy(30*time.Second)) + require.NoError(t, utils.WaitForDevnetHealthy()) serverAddr := "node3.oap.devnet" privKeyPath := "./assets/demo_key.txt" diff --git a/pkg/integration_tests/12_rewards_test.go b/pkg/integration_tests/12_rewards_test.go index bebea62e..a6536418 100644 --- a/pkg/integration_tests/12_rewards_test.go +++ b/pkg/integration_tests/12_rewards_test.go @@ -3,7 +3,6 @@ package integration_tests import ( "context" "testing" - "time" v1 "github.com/OpenAudio/go-openaudio/pkg/api/core/v1" "github.com/OpenAudio/go-openaudio/pkg/common" @@ -17,7 +16,7 @@ func TestRewardsLifecycle(t *testing.T) { nodeUrl := utils.DiscoveryOneRPC // Wait for devnet to be ready - if err := utils.WaitForDevnetHealthy(30 * time.Second); err != nil { + if err := utils.WaitForDevnetHealthy(); err != nil { t.Fatalf("Devnet not ready: %v", err) } diff --git a/pkg/integration_tests/90_deregistration_test.go b/pkg/integration_tests/90_deregistration_test.go index acceef8c..972ef25d 100644 --- a/pkg/integration_tests/90_deregistration_test.go +++ b/pkg/integration_tests/90_deregistration_test.go @@ -45,7 +45,7 @@ func TestDeregisterNode(t *testing.T) { wsRpcUrl = "ws" + strings.TrimPrefix(wsRpcUrl, "http") } - err := utils.WaitForDevnetHealthy(30 * time.Second) + err := utils.WaitForDevnetHealthy() require.NoError(t, err) ethrpc, err := ethclient.Dial(wsRpcUrl) diff --git a/pkg/integration_tests/utils/nodes.go b/pkg/integration_tests/utils/nodes.go index 8c78a961..1c66cbbe 100644 --- a/pkg/integration_tests/utils/nodes.go +++ b/pkg/integration_tests/utils/nodes.go @@ -3,6 +3,7 @@ package utils import ( "context" "crypto/tls" + "encoding/json" "errors" "net/http" "os" @@ -69,8 +70,12 @@ func EnsureProtocol(endpoint string) string { return endpoint } -func WaitForDevnetHealthy(timeout time.Duration) error { - ctx, cancel := context.WithTimeout(context.Background(), timeout) +func WaitForDevnetHealthy(timeout ...time.Duration) error { + timeoutDuration := 60 * time.Second + if len(timeout) > 0 { + timeoutDuration = timeout[0] + } + ctx, cancel := context.WithTimeout(context.Background(), timeoutDuration) defer cancel() nodes := []*sdk.OpenAudioSDK{ @@ -80,14 +85,23 @@ func WaitForDevnetHealthy(timeout time.Duration) error { ContentThree, } + nodeAddresses := []string{ + ContentOneRPC, + ContentTwoRPC, + ContentThreeRPC, + } + ticker := time.NewTicker(1 * time.Second) defer ticker.Stop() + client := NewTestHTTPClient() + for { select { case <-ctx.Done(): return errors.New("timed out waiting for devnet to be ready") case <-ticker.C: + // Check core services are ready allReady := true for _, n := range nodes { status, err := n.Core.GetStatus(context.Background(), connect.NewRequest(&corev1.GetStatusRequest{})) @@ -99,7 +113,63 @@ func WaitForDevnetHealthy(timeout time.Duration) error { break } } - if allReady { + if !allReady { + continue + } + + // Check mediorum services have wallets registered + allMediorumReady := true + for _, addr := range nodeAddresses { + baseURL := EnsureProtocol(addr) + if !strings.HasPrefix(baseURL, "http://") && !strings.HasPrefix(baseURL, "https://") { + baseURL = "https://" + baseURL + } + + healthURL := baseURL + "/health-check" + req, err := http.NewRequestWithContext(ctx, "GET", healthURL, nil) + if err != nil { + allMediorumReady = false + break + } + + resp, err := client.Do(req) + if err != nil { + allMediorumReady = false + break + } + + // Parse JSON response to check wallet_is_registered + var healthResponse struct { + Data struct { + WalletIsRegistered bool `json:"wallet_is_registered"` + } `json:"data"` + Storage struct { + WalletIsRegistered bool `json:"wallet_is_registered"` + } `json:"storage"` + } + + if resp.StatusCode == 200 { + if err := json.NewDecoder(resp.Body).Decode(&healthResponse); err == nil { + // Check either data.wallet_is_registered (legacy) or storage.wallet_is_registered (new format) + if !healthResponse.Data.WalletIsRegistered && !healthResponse.Storage.WalletIsRegistered { + resp.Body.Close() + allMediorumReady = false + break + } + resp.Body.Close() + } else { + resp.Body.Close() + allMediorumReady = false + break + } + } else { + resp.Body.Close() + allMediorumReady = false + break + } + } + + if allReady && allMediorumReady { return nil } } From 2194f2b2919cfa875b2a05b3b355fc0903b750e8 Mon Sep 17 00:00:00 2001 From: Raymond Jacobson Date: Tue, 10 Feb 2026 12:03:05 -0800 Subject: [PATCH 3/6] Refresh wallet is registered status --- pkg/integration_tests/utils/nodes.go | 9 +-------- pkg/mediorum/server/server.go | 24 +++++++++++++++++++++++- 2 files changed, 24 insertions(+), 9 deletions(-) diff --git a/pkg/integration_tests/utils/nodes.go b/pkg/integration_tests/utils/nodes.go index 1c66cbbe..62b90cd5 100644 --- a/pkg/integration_tests/utils/nodes.go +++ b/pkg/integration_tests/utils/nodes.go @@ -131,18 +131,12 @@ func WaitForDevnetHealthy(timeout ...time.Duration) error { allMediorumReady = false break } - resp, err := client.Do(req) if err != nil { allMediorumReady = false break } - - // Parse JSON response to check wallet_is_registered var healthResponse struct { - Data struct { - WalletIsRegistered bool `json:"wallet_is_registered"` - } `json:"data"` Storage struct { WalletIsRegistered bool `json:"wallet_is_registered"` } `json:"storage"` @@ -150,8 +144,7 @@ func WaitForDevnetHealthy(timeout ...time.Duration) error { if resp.StatusCode == 200 { if err := json.NewDecoder(resp.Body).Decode(&healthResponse); err == nil { - // Check either data.wallet_is_registered (legacy) or storage.wallet_is_registered (new format) - if !healthResponse.Data.WalletIsRegistered && !healthResponse.Storage.WalletIsRegistered { + if !healthResponse.Storage.WalletIsRegistered { resp.Body.Close() allMediorumReady = false break diff --git a/pkg/mediorum/server/server.go b/pkg/mediorum/server/server.go index 249e4a13..ecf18256 100644 --- a/pkg/mediorum/server/server.go +++ b/pkg/mediorum/server/server.go @@ -715,7 +715,29 @@ func (ss *MediorumServer) refreshPeersAndSigners(ctx context.Context) error { ss.Config.Peers = peers ss.Config.Signers = signers - ss.logger.Info("updated peers and signers dynamically", zap.Int("peers", len(peers)), zap.Int("signers", len(signers))) + // Update WalletIsRegistered based on updated peers list + ss.Config.WalletIsRegistered = false + for _, peer := range peers { + if strings.EqualFold(ss.Config.Self.Wallet, peer.Wallet) && strings.EqualFold(ss.Config.Self.Host, peer.Host) { + ss.Config.WalletIsRegistered = true + break + } + } + + // Log detailed info if not registered to help diagnose issues + if !ss.Config.WalletIsRegistered { + peerHosts := make([]string, len(peers)) + for i, p := range peers { + peerHosts[i] = p.Host + } + ss.logger.Warn("node not found in registered peers list", + zap.String("self_host", ss.Config.Self.Host), + zap.String("self_wallet", ss.Config.Self.Wallet), + zap.Int("total_peers", len(peers)), + zap.Strings("peer_hosts", peerHosts)) + } + + ss.logger.Info("updated peers and signers dynamically", zap.Int("peers", len(peers)), zap.Int("signers", len(signers)), zap.Bool("wallet_is_registered", ss.Config.WalletIsRegistered)) case <-ctx.Done(): return ctx.Err() } From 095fb0488a29cb0c6447ecaf82f564336faafcaa Mon Sep 17 00:00:00 2001 From: Raymond Jacobson Date: Tue, 10 Feb 2026 12:05:43 -0800 Subject: [PATCH 4/6] Refactor --- pkg/integration_tests/utils/nodes.go | 38 +++++++++++----------------- 1 file changed, 15 insertions(+), 23 deletions(-) diff --git a/pkg/integration_tests/utils/nodes.go b/pkg/integration_tests/utils/nodes.go index 62b90cd5..79b24f19 100644 --- a/pkg/integration_tests/utils/nodes.go +++ b/pkg/integration_tests/utils/nodes.go @@ -119,47 +119,39 @@ func WaitForDevnetHealthy(timeout ...time.Duration) error { // Check mediorum services have wallets registered allMediorumReady := true + var healthResponse struct { + Storage struct { + WalletIsRegistered bool `json:"wallet_is_registered"` + } `json:"storage"` + } + for _, addr := range nodeAddresses { - baseURL := EnsureProtocol(addr) - if !strings.HasPrefix(baseURL, "http://") && !strings.HasPrefix(baseURL, "https://") { + // Ensure https:// protocol + baseURL := addr + if !strings.HasPrefix(baseURL, "https://") && !strings.HasPrefix(baseURL, "http://") { baseURL = "https://" + baseURL + } else if strings.HasPrefix(baseURL, "http://") { + baseURL = strings.Replace(baseURL, "http://", "https://", 1) } - healthURL := baseURL + "/health-check" - req, err := http.NewRequestWithContext(ctx, "GET", healthURL, nil) + req, err := http.NewRequestWithContext(ctx, "GET", baseURL+"/health-check", nil) if err != nil { allMediorumReady = false break } + resp, err := client.Do(req) if err != nil { allMediorumReady = false break } - var healthResponse struct { - Storage struct { - WalletIsRegistered bool `json:"wallet_is_registered"` - } `json:"storage"` - } - if resp.StatusCode == 200 { - if err := json.NewDecoder(resp.Body).Decode(&healthResponse); err == nil { - if !healthResponse.Storage.WalletIsRegistered { - resp.Body.Close() - allMediorumReady = false - break - } - resp.Body.Close() - } else { - resp.Body.Close() - allMediorumReady = false - break - } - } else { + if resp.StatusCode != 200 || json.NewDecoder(resp.Body).Decode(&healthResponse) != nil || !healthResponse.Storage.WalletIsRegistered { resp.Body.Close() allMediorumReady = false break } + resp.Body.Close() } if allReady && allMediorumReady { From 97e4319952c46774b564a975cec2181a165bbca8 Mon Sep 17 00:00:00 2001 From: Raymond Jacobson Date: Tue, 10 Feb 2026 12:20:25 -0800 Subject: [PATCH 5/6] Add verification step --- pkg/core/server/connect.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/pkg/core/server/connect.go b/pkg/core/server/connect.go index 3e8d76dd..5403933b 100644 --- a/pkg/core/server/connect.go +++ b/pkg/core/server/connect.go @@ -630,7 +630,13 @@ func (c *CoreService) SendTransaction(ctx context.Context, req *connect.Request[ // get all receipts by tx hash and use index to map to the correct message // get ERNs, MEADs, and PIES by tx hash and use index to map to the correct message - ernReceipts, err := c.core.db.GetERNReceipts(ctx, txhash) + // Retry query in case receipt was just committed (read consistency) + var ernReceipts, err = c.core.db.GetERNReceipts(ctx, txhash) + if err == nil && len(ernReceipts) == 0 { + // Retry once after brief delay for read consistency + time.Sleep(100 * time.Millisecond) + ernReceipts, err = c.core.db.GetERNReceipts(ctx, txhash) + } if err != nil { c.core.logger.Error("error getting ERN receipts", zap.Error(err)) } else { @@ -639,6 +645,7 @@ func (c *CoreService) SendTransaction(ctx context.Context, req *connect.Request[ err = proto.Unmarshal(ernReceipt.RawAcknowledgment, ernAck) if err != nil { c.core.logger.Error("error unmarshalling ERN receipt", zap.Error(err)) + continue } receipt.MessageReceipts[ernReceipt.Index] = &v1beta1.MessageReceipt{ MessageIndex: int32(ernReceipt.Index), From 18fcd0e6a22b79f51af24edf59e3c0b1ae67c224 Mon Sep 17 00:00:00 2001 From: Raymond Jacobson Date: Tue, 10 Feb 2026 12:35:46 -0800 Subject: [PATCH 6/6] Revert "Add verification step" This reverts commit 97e4319952c46774b564a975cec2181a165bbca8. --- pkg/core/server/connect.go | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/pkg/core/server/connect.go b/pkg/core/server/connect.go index 5403933b..3e8d76dd 100644 --- a/pkg/core/server/connect.go +++ b/pkg/core/server/connect.go @@ -630,13 +630,7 @@ func (c *CoreService) SendTransaction(ctx context.Context, req *connect.Request[ // get all receipts by tx hash and use index to map to the correct message // get ERNs, MEADs, and PIES by tx hash and use index to map to the correct message - // Retry query in case receipt was just committed (read consistency) - var ernReceipts, err = c.core.db.GetERNReceipts(ctx, txhash) - if err == nil && len(ernReceipts) == 0 { - // Retry once after brief delay for read consistency - time.Sleep(100 * time.Millisecond) - ernReceipts, err = c.core.db.GetERNReceipts(ctx, txhash) - } + ernReceipts, err := c.core.db.GetERNReceipts(ctx, txhash) if err != nil { c.core.logger.Error("error getting ERN receipts", zap.Error(err)) } else { @@ -645,7 +639,6 @@ func (c *CoreService) SendTransaction(ctx context.Context, req *connect.Request[ err = proto.Unmarshal(ernReceipt.RawAcknowledgment, ernAck) if err != nil { c.core.logger.Error("error unmarshalling ERN receipt", zap.Error(err)) - continue } receipt.MessageReceipts[ernReceipt.Index] = &v1beta1.MessageReceipt{ MessageIndex: int32(ernReceipt.Index),